You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2014/10/31 08:51:35 UTC

[02/10] renaming application.status topic to applications

http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppClusterTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppClusterTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppClusterTerminatingMessageProcessor.java
new file mode 100644
index 0000000..9520f4e
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppClusterTerminatingMessageProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.AppClusterTerminatingEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+
+public class AppClusterTerminatingMessageProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(AppClusterTerminatingMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (AppClusterTerminatingEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            AppClusterTerminatingEvent event = (AppClusterTerminatingEvent) Util.
+                                                jsonToObject(message, AppClusterTerminatingEvent.class);
+
+            if(log.isDebugEnabled()) {
+                log.debug("Received AppStatusClusterTerminatingEvent: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(String.format("Failed to process cluster activated message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppStatusMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppStatusMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppStatusMessageProcessorChain.java
new file mode 100644
index 0000000..1524b00
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppStatusMessageProcessorChain.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.listener.application.status.*;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+
+/**
+ * Application Status processor chain is to handle the list processors to parse the application
+ * status.
+ */
+public class AppStatusMessageProcessorChain extends MessageProcessorChain {
+    private static final Log log = LogFactory.getLog(AppStatusMessageProcessorChain.class);
+
+    private AppStatusClusterCreatedMessageProcessor clusterCreatedMessageProcessor;
+    private AppStatusClusterActivatedMessageProcessor clusterActivatedMessageProcessor;
+    private AppStatusClusterInactivateMessageProcessor clusterInActivateMessageProcessor;
+    private AppStatusClusterTerminatingMessageProcessor clusterTerminatingMessageProcessor;
+    private AppStatusClusterTerminatedMessageProcessor clusterTerminatedMessageProcessor;
+    private AppStatusGroupCreatedMessageProcessor groupCreatedMessageProcessor;
+    private AppStatusGroupActivatedMessageProcessor groupActivatedMessageProcessor;
+    private AppStatusGroupInactivatedMessageProcessor groupInActivateMessageProcessor;
+    private AppStatusApplicationActivatedMessageProcessor appActivatedMessageProcessor;
+    private AppStatusApplicationCreatedMessageProcessor applicationStatusAppCreatedMessageProcessor;
+    private AppStatusApplicationInactivatedMessageProcessor applicationStatusAppInActivatedMessageProcessor;
+    private AppStatusApplicationTerminatedMessageProcessor applicationStatusAppTerminatedMessageProcessor;
+    private AppStatusApplicationTerminatingMessageProcessor applicationStatusAppTerminatingMessageProcessor;
+
+    private AppStatusGroupTerminatedMessageProcessor groupTerminatedMessageProcessor;
+    private AppStatusGroupTerminatingMessageProcessor groupTerminatingMessageProcessor;
+
+    public void initialize() {
+        // Add instance notifier event processors
+        clusterCreatedMessageProcessor= new AppStatusClusterCreatedMessageProcessor();
+        add(clusterCreatedMessageProcessor);
+
+        clusterActivatedMessageProcessor = new AppStatusClusterActivatedMessageProcessor();
+        add(clusterActivatedMessageProcessor);
+
+        clusterInActivateMessageProcessor = new AppStatusClusterInactivateMessageProcessor();
+        add(clusterInActivateMessageProcessor);
+
+        clusterTerminatingMessageProcessor = new AppStatusClusterTerminatingMessageProcessor();
+        add(clusterTerminatingMessageProcessor);
+
+        clusterTerminatedMessageProcessor = new AppStatusClusterTerminatedMessageProcessor();
+        add(clusterTerminatedMessageProcessor);
+
+        groupCreatedMessageProcessor = new AppStatusGroupCreatedMessageProcessor();
+        add(groupCreatedMessageProcessor);
+
+        groupActivatedMessageProcessor = new AppStatusGroupActivatedMessageProcessor();
+        add(groupActivatedMessageProcessor);
+
+        groupInActivateMessageProcessor = new AppStatusGroupInactivatedMessageProcessor();
+        add(groupInActivateMessageProcessor);
+
+        appActivatedMessageProcessor = new AppStatusApplicationActivatedMessageProcessor();
+        add(appActivatedMessageProcessor);
+
+        applicationStatusAppCreatedMessageProcessor = new AppStatusApplicationCreatedMessageProcessor();
+        this.add(applicationStatusAppCreatedMessageProcessor);
+
+        applicationStatusAppInActivatedMessageProcessor = new AppStatusApplicationInactivatedMessageProcessor();
+        this.add(applicationStatusAppInActivatedMessageProcessor);
+
+        applicationStatusAppTerminatedMessageProcessor = new AppStatusApplicationTerminatedMessageProcessor();
+        this.add(applicationStatusAppTerminatedMessageProcessor);
+
+        applicationStatusAppTerminatingMessageProcessor = new AppStatusApplicationTerminatingMessageProcessor();
+        this.add(applicationStatusAppTerminatingMessageProcessor);
+
+        groupTerminatedMessageProcessor = new AppStatusGroupTerminatedMessageProcessor();
+        this.add(groupTerminatedMessageProcessor);
+
+        groupTerminatingMessageProcessor = new AppStatusGroupTerminatingMessageProcessor();
+        this.add(groupTerminatingMessageProcessor);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Instance notifier message processor chain initialized");
+        }
+    }
+
+    public void addEventListener(EventListener eventListener) {
+        if(eventListener instanceof AppStatusClusterCreatedEventListener) {
+            clusterCreatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof AppStatusClusterActivatedEventListener) {
+            clusterActivatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof AppStatusClusterInactivateEventListener) {
+            clusterInActivateMessageProcessor.addEventListener(eventListener);
+        } else if(eventListener instanceof AppStatusGroupCreatedEventListener) {
+            groupCreatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof AppStatusGroupActivatedEventListener) {
+            groupActivatedMessageProcessor.addEventListener(eventListener);
+        } else if(eventListener instanceof AppStatusClusterTerminatedEventListener){
+            clusterTerminatedMessageProcessor.addEventListener(eventListener);
+        } else if(eventListener instanceof AppStatusClusterTerminatingEventListener){
+            clusterTerminatingMessageProcessor.addEventListener(eventListener);
+        }else if (eventListener instanceof AppStatusGroupInactivateEventListener) {
+            groupInActivateMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof AppStatusApplicationActivatedEventListener) {
+            appActivatedMessageProcessor.addEventListener(eventListener);
+        } else if(eventListener instanceof AppStatusApplicationInactivatedEventListener){
+            applicationStatusAppInActivatedMessageProcessor.addEventListener(eventListener);
+        } else if(eventListener instanceof AppStatusApplicationCreatedEventListener){
+            applicationStatusAppCreatedMessageProcessor.addEventListener(eventListener);
+        } else if(eventListener instanceof AppStatusApplicationTerminatingEventListener){
+            applicationStatusAppTerminatingMessageProcessor.addEventListener(eventListener);
+        } else if(eventListener instanceof AppStatusApplicationTerminatedEventListener){
+            applicationStatusAppTerminatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof AppStatusGroupTerminatingEventListener){
+            groupTerminatingMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof AppStatusGroupTerminatedEventListener){
+            groupTerminatedMessageProcessor.addEventListener(eventListener);
+        } else
+        {
+            throw new RuntimeException("Unknown event listener " + eventListener.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java
new file mode 100644
index 0000000..71bd50e
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.ApplicationActivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class ApplicationActivatedMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(ApplicationActivatedMessageProcessor.class);
+
+
+    private MessageProcessor nextProcessor;
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (ApplicationActivatedEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            ApplicationActivatedEvent event =
+                    (ApplicationActivatedEvent) Util.jsonToObject(message, ApplicationActivatedEvent.class);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Received ApplicationActivatedEvent in application status topic: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(
+                        String.format("Failed to process group activated message " +
+                                "using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
new file mode 100644
index 0000000..db5d777
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.ApplicationCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class ApplicationCreatedMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(ApplicationCreatedMessageProcessor.class);
+
+
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (ApplicationCreatedEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            ApplicationCreatedEvent event =
+                    (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Received ApplicationCreated Event in application status topic: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(
+                        String.format("Failed to process group activated message " +
+                                "using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
new file mode 100644
index 0000000..d8f2aac
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.ApplicationInactivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class ApplicationInactivatedMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(ApplicationInactivatedMessageProcessor.class);
+
+
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (ApplicationInactivatedEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            ApplicationInactivatedEvent event =
+                    (ApplicationInactivatedEvent) Util.jsonToObject(message, ApplicationInactivatedEvent.class);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Received ApplicationInActivatedEvent in application status topic: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(
+                        String.format("Failed to process group activated message " +
+                                "using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
new file mode 100644
index 0000000..a121ffb
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.ApplicationTerminatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class ApplicationTerminatedMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(ApplicationTerminatedMessageProcessor.class);
+
+
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (ApplicationTerminatedEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            ApplicationTerminatedEvent event =
+                    (ApplicationTerminatedEvent) Util.jsonToObject(message, ApplicationTerminatedEvent.class);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Received ApplicationTerminatedEvent in application status topic: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(
+                        String.format("Failed to process group activated message " +
+                                "using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
new file mode 100644
index 0000000..280de2c
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.ApplicationTerminatingEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class ApplicationTerminatingMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(ApplicationTerminatingMessageProcessor.class);
+
+
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (ApplicationTerminatingEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            ApplicationTerminatingEvent event =
+                    (ApplicationTerminatingEvent) Util.jsonToObject(message, ApplicationTerminatingEvent.class);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Received ApplicationTerminatingEvent in application status topic: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(
+                        String.format("Failed to process group activated message " +
+                                "using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java
new file mode 100644
index 0000000..02ddda8
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.GroupActivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class GroupActivatedMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(GroupActivatedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (GroupActivatedEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            GroupActivatedEvent event =
+                    (GroupActivatedEvent) Util.jsonToObject(message, GroupActivatedEvent.class);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Received GroupActivatedEvent: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(
+                        String.format("Failed to process group activated message " +
+                                "using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java
new file mode 100644
index 0000000..d04d7f9
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.GroupCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class GroupCreatedMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(GroupCreatedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (GroupCreatedEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            GroupCreatedEvent event =
+                    (GroupCreatedEvent) Util.jsonToObject(message, GroupCreatedEvent.class);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Received AppStatusGroupCreatedEvent: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(
+                        String.format("Failed to process group created message " +
+                                "using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java
new file mode 100644
index 0000000..6cf2587
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.AppStatusGroupInactivateEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class GroupInactivatedMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(GroupInactivatedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (AppStatusGroupInactivateEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            AppStatusGroupInactivateEvent event =
+                    (AppStatusGroupInactivateEvent) Util.jsonToObject(message, AppStatusGroupInactivateEvent.class);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Received GroupInActivateEvent: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(
+                        String.format("Failed to process group in activated message " +
+                                "using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java
new file mode 100644
index 0000000..a917a14
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.GroupTerminatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class GroupTerminatedMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(GroupTerminatedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (GroupTerminatedEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            GroupTerminatedEvent event =
+                    (GroupTerminatedEvent) Util.jsonToObject(message, GroupTerminatedEvent.class);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Received GroupTerminatingEvent: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(
+                        String.format("Failed to process group in GroupTerminatingEvent message " +
+                                "using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java
new file mode 100644
index 0000000..63c055d
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.GroupTerminatingEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class GroupTerminatingMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(GroupTerminatingMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (GroupTerminatingEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            GroupTerminatingEvent event =
+                    (GroupTerminatingEvent) Util.jsonToObject(message, GroupTerminatingEvent.class);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Received GroupTerminatingEvent: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(
+                        String.format("Failed to process group in GroupTerminatingEvent message " +
+                                "using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}