You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2014/12/05 14:40:28 UTC

[2/5] stratos git commit: adding instance prefix to events, , processor and listener

http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInstanceTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInstanceTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInstanceTerminatedMessageProcessor.java
new file mode 100644
index 0000000..c7a39b5
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInstanceTerminatedMessageProcessor.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Applications;
+import org.apache.stratos.messaging.event.applications.ApplicationInstanceTerminatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor responsible to process the application Inactivation even and update the Topology.
+ */
+public class ApplicationInstanceTerminatedMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(ApplicationInstanceTerminatedMessageProcessor.class);
+
+
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Applications applications = (Applications) object;
+
+        if (ApplicationInstanceTerminatedEvent.class.getName().equals(type)) {
+            // Return if applications has not been initialized
+            if (!applications.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            ApplicationInstanceTerminatedEvent event = (ApplicationInstanceTerminatedEvent) Util.
+                    jsonToObject(message, ApplicationInstanceTerminatedEvent.class);
+
+            ApplicationsUpdater.acquireWriteLockForApplications();
+
+            try {
+                return doProcess(event, applications);
+
+            } finally {
+                ApplicationsUpdater.releaseWriteLockForApplications();
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, applications);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess(ApplicationInstanceTerminatedEvent event, Applications applications) {
+
+        // check if required properties are available
+        if (event.getAppId() == null) {
+            String errorMsg = "Application Id of application removed event is invalid";
+            log.error(errorMsg);
+            throw new RuntimeException(errorMsg);
+        }
+
+        // check if an Application with same name exists in applications
+        String appId = event.getAppId();
+        if (applications.applicationExists(appId)) {
+            log.warn("Application with id [ " + appId + " ] still exists in Applications, removing it");
+            applications.removeApplication(appId);
+        }
+
+        notifyEventListeners(event);
+        return true;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInstanceTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInstanceTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInstanceTerminatingMessageProcessor.java
new file mode 100644
index 0000000..7679091
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInstanceTerminatingMessageProcessor.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
+import org.apache.stratos.messaging.domain.applications.Applications;
+import org.apache.stratos.messaging.domain.instance.ApplicationInstance;
+import org.apache.stratos.messaging.event.applications.ApplicationInstanceTerminatingEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor responsible to process the application Inactivation even and update the Topology.
+ */
+public class ApplicationInstanceTerminatingMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(ApplicationInstanceTerminatingMessageProcessor.class);
+
+
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Applications applications = (Applications) object;
+
+        if (ApplicationInstanceTerminatingEvent.class.getName().equals(type)) {
+            // Return if applications has not been initialized
+            if (!applications.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            ApplicationInstanceTerminatingEvent event = (ApplicationInstanceTerminatingEvent) Util.
+                    jsonToObject(message, ApplicationInstanceTerminatingEvent.class);
+
+            ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, applications);
+
+            } finally {
+                ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, applications);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess(ApplicationInstanceTerminatingEvent event, Applications applications) {
+
+        // Validate event against the existing applications
+        Application application = applications.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        } else {
+            // Apply changes to the applications
+            ApplicationInstance context = application.getInstanceContexts(event.getInstanceId());
+            if(context == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Application Instance not exists in Group: [AppId] %s" +
+                            "[instanceId] %s", event.getAppId(), event.getInstanceId()));
+                    return false;
+                }
+            }
+            ApplicationStatus status = ApplicationStatus.Terminating;
+            if (!context.isStateTransitionValid(status)) {
+                log.error("Invalid State transfer from [ " + context.getStatus() +
+                        " ] to [ " + status + " ]");
+            }
+            context.setStatus(status);
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
deleted file mode 100644
index a6d2cd5..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.event.applications.ApplicationInstanceTerminatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor responsible to process the application Inactivation even and update the Topology.
- */
-public class ApplicationTerminatedMessageProcessor extends MessageProcessor {
-    private static final Log log =
-            LogFactory.getLog(ApplicationTerminatedMessageProcessor.class);
-
-
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Applications applications = (Applications) object;
-
-        if (ApplicationInstanceTerminatedEvent.class.getName().equals(type)) {
-            // Return if applications has not been initialized
-            if (!applications.isInitialized())
-                return false;
-
-            // Parse complete message and build event
-            ApplicationInstanceTerminatedEvent event = (ApplicationInstanceTerminatedEvent) Util.
-                    jsonToObject(message, ApplicationInstanceTerminatedEvent.class);
-
-            ApplicationsUpdater.acquireWriteLockForApplications();
-
-            try {
-                return doProcess(event, applications);
-
-            } finally {
-                ApplicationsUpdater.releaseWriteLockForApplications();
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, applications);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess(ApplicationInstanceTerminatedEvent event, Applications applications) {
-
-        // check if required properties are available
-        if (event.getAppId() == null) {
-            String errorMsg = "Application Id of application removed event is invalid";
-            log.error(errorMsg);
-            throw new RuntimeException(errorMsg);
-        }
-
-        // check if an Application with same name exists in applications
-        String appId = event.getAppId();
-        if (applications.applicationExists(appId)) {
-            log.warn("Application with id [ " + appId + " ] still exists in Applications, removing it");
-            applications.removeApplication(appId);
-        }
-
-        notifyEventListeners(event);
-        return true;
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
deleted file mode 100644
index 33ad474..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
-import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.domain.instance.ApplicationInstance;
-import org.apache.stratos.messaging.event.applications.ApplicationInstanceTerminatingEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor responsible to process the application Inactivation even and update the Topology.
- */
-public class ApplicationTerminatingMessageProcessor extends MessageProcessor {
-    private static final Log log =
-            LogFactory.getLog(ApplicationTerminatingMessageProcessor.class);
-
-
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Applications applications = (Applications) object;
-
-        if (ApplicationInstanceTerminatingEvent.class.getName().equals(type)) {
-            // Return if applications has not been initialized
-            if (!applications.isInitialized())
-                return false;
-
-            // Parse complete message and build event
-            ApplicationInstanceTerminatingEvent event = (ApplicationInstanceTerminatingEvent) Util.
-                    jsonToObject(message, ApplicationInstanceTerminatingEvent.class);
-
-            ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
-
-            try {
-                return doProcess(event, applications);
-
-            } finally {
-                ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, applications);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess(ApplicationInstanceTerminatingEvent event, Applications applications) {
-
-        // Validate event against the existing applications
-        Application application = applications.getApplication(event.getAppId());
-        if (application == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Application does not exist: [service] %s",
-                        event.getAppId()));
-            }
-            return false;
-        } else {
-            // Apply changes to the applications
-            ApplicationInstance context = application.getInstanceContexts(event.getInstanceId());
-            if(context == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Application Instance not exists in Group: [AppId] %s" +
-                            "[instanceId] %s", event.getAppId(), event.getInstanceId()));
-                    return false;
-                }
-            }
-            ApplicationStatus status = ApplicationStatus.Terminating;
-            if (!context.isStateTransitionValid(status)) {
-                log.error("Invalid State transfer from [ " + context.getStatus() +
-                        " ] to [ " + status + " ]");
-            }
-            context.setStatus(status);
-        }
-
-        // Notify event listeners
-        notifyEventListeners(event);
-        return true;
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java
index a420eea..b88a3ec 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java
@@ -31,52 +31,52 @@ import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
 public class ApplicationsMessageProcessorChain extends MessageProcessorChain {
     private static final Log log = LogFactory.getLog(ApplicationsMessageProcessorChain.class);
 
-    private GroupResetProcessor groupCreatedMessageProcessor;
-    private GroupActivatedProcessor groupActivatedMessageProcessor;
-    private GroupInActivateProcessor groupInActivateMessageProcessor;
-    private GroupTerminatedProcessor groupTerminatedProcessor;
-    private GroupTerminatingProcessor groupTerminatingProcessor;
-    private ApplicationActivatedMessageProcessor applicationActivatedMessageProcessor;
+    private GroupInstanceCreatedProcessor groupCreatedMessageProcessor;
+    private GroupInstanceActivatedProcessor groupActivatedMessageProcessor;
+    private GroupInstanceInActivateProcessor groupInActivateMessageProcessor;
+    private GroupInstanceTerminatedProcessor groupTerminatedProcessor;
+    private GroupInstanceTerminatingProcessor groupTerminatingProcessor;
+    private ApplicationInstanceActivatedMessageProcessor applicationActivatedMessageProcessor;
     private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor;
-    private ApplicationInactivatedMessageProcessor applicationInactivatedMessageProcessor;
-    private ApplicationTerminatedMessageProcessor applicationTerminatedMessageProcessor;
-    private ApplicationTerminatingMessageProcessor applicationTerminatingMessageProcessor;
+    private ApplicationInstanceInactivatedMessageProcessor applicationInactivatedMessageProcessor;
+    private ApplicationInstanceTerminatedMessageProcessor applicationTerminatedMessageProcessor;
+    private ApplicationInstanceTerminatingMessageProcessor applicationTerminatingMessageProcessor;
     private CompleteApplicationsMessageProcessor completeApplicationsMessageProcessor;
 
     public void initialize() {
         // Add instance notifier event processors
 
-        groupCreatedMessageProcessor = new GroupResetProcessor();
+        groupCreatedMessageProcessor = new GroupInstanceCreatedProcessor();
         add(groupCreatedMessageProcessor);
 
-        groupActivatedMessageProcessor = new GroupActivatedProcessor();
+        groupActivatedMessageProcessor = new GroupInstanceActivatedProcessor();
         add(groupActivatedMessageProcessor);
 
-        groupInActivateMessageProcessor = new GroupInActivateProcessor();
+        groupInActivateMessageProcessor = new GroupInstanceInActivateProcessor();
         add(groupInActivateMessageProcessor);
 
-        groupTerminatedProcessor = new GroupTerminatedProcessor();
+        groupTerminatedProcessor = new GroupInstanceTerminatedProcessor();
         add(groupTerminatedProcessor);
 
-        groupTerminatingProcessor = new GroupTerminatingProcessor();
+        groupTerminatingProcessor = new GroupInstanceTerminatingProcessor();
         add(groupTerminatingProcessor);
 
-        applicationActivatedMessageProcessor = new ApplicationActivatedMessageProcessor();
+        applicationActivatedMessageProcessor = new ApplicationInstanceActivatedMessageProcessor();
         add(applicationActivatedMessageProcessor);
 
         applicationCreatedMessageProcessor = new ApplicationCreatedMessageProcessor();
         add(applicationCreatedMessageProcessor);
 
-        applicationInactivatedMessageProcessor = new ApplicationInactivatedMessageProcessor();
+        applicationInactivatedMessageProcessor = new ApplicationInstanceInactivatedMessageProcessor();
         add(applicationInactivatedMessageProcessor);
 
-        applicationTerminatingMessageProcessor = new ApplicationTerminatingMessageProcessor();
+        applicationTerminatingMessageProcessor = new ApplicationInstanceTerminatingMessageProcessor();
         add(applicationTerminatingMessageProcessor);
 
         completeApplicationsMessageProcessor = new CompleteApplicationsMessageProcessor();
         add(completeApplicationsMessageProcessor);
 
-        applicationTerminatedMessageProcessor = new ApplicationTerminatedMessageProcessor();
+        applicationTerminatedMessageProcessor = new ApplicationInstanceTerminatedMessageProcessor();
         add(applicationTerminatedMessageProcessor);
 
         if (log.isDebugEnabled()) {
@@ -86,25 +86,25 @@ public class ApplicationsMessageProcessorChain extends MessageProcessorChain {
 
     public void addEventListener(EventListener eventListener) {
 
-        if (eventListener instanceof GroupResetEventListener) {
+        if (eventListener instanceof GroupInstanceCreatedEventListener) {
             groupCreatedMessageProcessor.addEventListener(eventListener);
-        } else if (eventListener instanceof GroupInactivateEventListener) {
+        } else if (eventListener instanceof GroupInstanceInactivateEventListener) {
             groupInActivateMessageProcessor.addEventListener(eventListener);
-        } else if (eventListener instanceof GroupActivatedEventListener) {
+        } else if (eventListener instanceof GroupInstanceActivatedEventListener) {
             groupActivatedMessageProcessor.addEventListener(eventListener);
-        } else if (eventListener instanceof GroupTerminatingEventListener) {
+        } else if (eventListener instanceof GroupInstanceTerminatingEventListener) {
             groupTerminatingProcessor.addEventListener(eventListener);
-        } else if (eventListener instanceof GroupTerminatedEventListener) {
+        } else if (eventListener instanceof GroupInstanceTerminatedEventListener) {
             groupTerminatedProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof ApplicationCreatedEventListener) {
             applicationCreatedMessageProcessor.addEventListener(eventListener);
-        } else if (eventListener instanceof ApplicationActivatedEventListener) {
+        } else if (eventListener instanceof ApplicationInstanceActivatedEventListener) {
             applicationActivatedMessageProcessor.addEventListener(eventListener);
-        } else if (eventListener instanceof ApplicationInactivatedEventListener) {
+        } else if (eventListener instanceof ApplicationInstanceInactivatedEventListener) {
             applicationInactivatedMessageProcessor.addEventListener(eventListener);
-        } else if (eventListener instanceof ApplicationTerminatingEventListener) {
+        } else if (eventListener instanceof ApplicationInstanceTerminatingEventListener) {
             applicationTerminatingMessageProcessor.addEventListener(eventListener);
-        } else if (eventListener instanceof ApplicationTerminatedEventListener) {
+        } else if (eventListener instanceof ApplicationInstanceTerminatedEventListener) {
             applicationTerminatedMessageProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof CompleteApplicationsEventListener) {
             completeApplicationsMessageProcessor.addEventListener(eventListener);

http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java
deleted file mode 100644
index f37a7f4..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.instance.GroupInstance;
-import org.apache.stratos.messaging.event.applications.GroupActivatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupActivatedProcessor extends MessageProcessor {
-    private static final Log log = LogFactory.getLog(GroupActivatedProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Applications applications = (Applications) object;
-
-        if (GroupActivatedEvent.class.getName().equals(type)) {
-            // Return if applications has not been initialized
-            if (!applications.isInitialized())
-                return false;
-
-            // Parse complete message and build event
-            GroupActivatedEvent event = (GroupActivatedEvent) Util.
-                    jsonToObject(message, GroupActivatedEvent.class);
-
-            ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
-
-            try {
-                return doProcess(event, applications);
-
-            } finally {
-                ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, applications);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess(GroupActivatedEvent event, Applications applications) {
-
-        // Validate event against the existing topology
-        Application application = applications.getApplication(event.getAppId());
-        if (application == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Application does not exist: [service] %s",
-                        event.getAppId()));
-            }
-            return false;
-        }
-        Group group = application.getGroupRecursively(event.getGroupId());
-
-        if (group == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
-                        event.getGroupId()));
-                return false;
-            }
-        } else {
-            GroupInstance context = group.getInstanceContexts(event.getInstanceId());
-            if(context == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
-                                    "[instanceId] %s", event.getAppId(), event.getGroupId(),
-                            event.getInstanceId()));
-                    return false;
-                }
-            }
-            // Apply changes to the topology
-            GroupStatus status = GroupStatus.Active;
-            if (!context.isStateTransitionValid(status)) {
-                log.error("Invalid State Transition from " + context.getStatus() + " to " +
-                        status);
-                return false;
-            }
-            context.setStatus(status);
-        }
-
-        // Notify event listeners
-        notifyEventListeners(event);
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java
deleted file mode 100644
index 41a2881..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.instance.GroupInstance;
-import org.apache.stratos.messaging.event.applications.GroupInactivatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupInActivateProcessor extends MessageProcessor {
-    private static final Log log = LogFactory.getLog(GroupInActivateProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Applications applications = (Applications) object;
-
-        if (GroupInactivatedEvent.class.getName().equals(type)) {
-            // Return if applications has not been initialized
-            if (!applications.isInitialized())
-                return false;
-
-            // Parse complete message and build event
-            GroupInactivatedEvent event = (GroupInactivatedEvent) Util.
-                    jsonToObject(message, GroupInactivatedEvent.class);
-
-            ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
-
-            try {
-                return doProcess(event, applications);
-
-            } finally {
-                ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, applications);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess(GroupInactivatedEvent event, Applications applications) {
-
-        // Validate event against the existing applications
-        Application application = applications.getApplication(event.getAppId());
-        if (application == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Application does not exist: [service] %s",
-                        event.getAppId()));
-            }
-            return false;
-        }
-        Group group = application.getGroupRecursively(event.getGroupId());
-
-        if (group == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
-                        event.getGroupId()));
-                return false;
-            }
-        } else {
-            GroupInstance context = group.getInstanceContexts(event.getInstanceId());
-            if(context == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
-                                    "[instanceId] %s", event.getAppId(), event.getGroupId(),
-                            event.getInstanceId()));
-                    return false;
-                }
-            }
-            // Apply changes to the topology
-            GroupStatus status = GroupStatus.Inactive;
-            if (!context.isStateTransitionValid(status)) {
-                log.error("Invalid State Transition from " + context.getStatus() + " to " +
-                        status);
-                return false;
-            }
-            context.setStatus(status);
-        }
-
-        // Notify event listeners
-        notifyEventListeners(event);
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceActivatedProcessor.java
new file mode 100644
index 0000000..c74d176
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceActivatedProcessor.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Applications;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.instance.GroupInstance;
+import org.apache.stratos.messaging.event.applications.GroupInstanceActivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupInstanceActivatedProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(GroupInstanceActivatedProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Applications applications = (Applications) object;
+
+        if (GroupInstanceActivatedEvent.class.getName().equals(type)) {
+            // Return if applications has not been initialized
+            if (!applications.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            GroupInstanceActivatedEvent event = (GroupInstanceActivatedEvent) Util.
+                    jsonToObject(message, GroupInstanceActivatedEvent.class);
+
+            ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, applications);
+
+            } finally {
+                ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, applications);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess(GroupInstanceActivatedEvent event, Applications applications) {
+
+        // Validate event against the existing topology
+        Application application = applications.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        }
+        Group group = application.getGroupRecursively(event.getGroupId());
+
+        if (group == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+                        event.getGroupId()));
+                return false;
+            }
+        } else {
+            GroupInstance context = group.getInstanceContexts(event.getInstanceId());
+            if(context == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
+                                    "[instanceId] %s", event.getAppId(), event.getGroupId(),
+                            event.getInstanceId()));
+                    return false;
+                }
+            }
+            // Apply changes to the topology
+            GroupStatus status = GroupStatus.Active;
+            if (!context.isStateTransitionValid(status)) {
+                log.error("Invalid State Transition from " + context.getStatus() + " to " +
+                        status);
+                return false;
+            }
+            context.setStatus(status);
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceCreatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceCreatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceCreatedProcessor.java
new file mode 100644
index 0000000..aa17fbf
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceCreatedProcessor.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Applications;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.instance.GroupInstance;
+import org.apache.stratos.messaging.event.applications.GroupInstanceCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupInstanceCreatedProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(GroupInstanceCreatedProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Applications applications = (Applications) object;
+
+        if (GroupInstanceCreatedEvent.class.getName().equals(type)) {
+            // Return if applications has not been initialized
+            if (!applications.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            GroupInstanceCreatedEvent event = (GroupInstanceCreatedEvent) Util.
+                    jsonToObject(message, GroupInstanceCreatedEvent.class);
+
+            ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, applications);
+
+            } finally {
+                ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, applications);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess(GroupInstanceCreatedEvent event, Applications applications) {
+
+        // Validate event against the existing applications
+        Application application = applications.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        }
+        Group group = application.getGroupRecursively(event.getGroupId());
+
+        if (group == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+                        event.getGroupId()));
+                return false;
+            }
+        } else {
+            // Apply changes to the applications
+            String instanceId = event.getGroupInstance().getInstanceId();
+            GroupInstance context = group.getInstanceContexts(instanceId);
+            if(context == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
+                                    "[instanceId] %s", event.getAppId(), event.getGroupId(),
+                            instanceId));
+                    return false;
+                }
+            }
+            // Apply changes to the topology
+            group.addInstance(instanceId, event.getGroupInstance());
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceInActivateProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceInActivateProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceInActivateProcessor.java
new file mode 100644
index 0000000..c1e00bf
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceInActivateProcessor.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Applications;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.instance.GroupInstance;
+import org.apache.stratos.messaging.event.applications.GroupInstanceInactivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupInstanceInActivateProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(GroupInstanceInActivateProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Applications applications = (Applications) object;
+
+        if (GroupInstanceInactivatedEvent.class.getName().equals(type)) {
+            // Return if applications has not been initialized
+            if (!applications.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            GroupInstanceInactivatedEvent event = (GroupInstanceInactivatedEvent) Util.
+                    jsonToObject(message, GroupInstanceInactivatedEvent.class);
+
+            ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, applications);
+
+            } finally {
+                ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, applications);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess(GroupInstanceInactivatedEvent event, Applications applications) {
+
+        // Validate event against the existing applications
+        Application application = applications.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        }
+        Group group = application.getGroupRecursively(event.getGroupId());
+
+        if (group == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+                        event.getGroupId()));
+                return false;
+            }
+        } else {
+            GroupInstance context = group.getInstanceContexts(event.getInstanceId());
+            if(context == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
+                                    "[instanceId] %s", event.getAppId(), event.getGroupId(),
+                            event.getInstanceId()));
+                    return false;
+                }
+            }
+            // Apply changes to the topology
+            GroupStatus status = GroupStatus.Inactive;
+            if (!context.isStateTransitionValid(status)) {
+                log.error("Invalid State Transition from " + context.getStatus() + " to " +
+                        status);
+                return false;
+            }
+            context.setStatus(status);
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceTerminatingProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceTerminatingProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceTerminatingProcessor.java
new file mode 100644
index 0000000..e3a8708
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceTerminatingProcessor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Applications;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.instance.GroupInstance;
+import org.apache.stratos.messaging.event.applications.GroupInstanceTerminatingEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupInstanceTerminatingProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(GroupInstanceTerminatingProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Applications applications = (Applications) object;
+
+        if (GroupInstanceTerminatingEvent.class.getName().equals(type)) {
+            // Return if applications has not been initialized
+            if (!applications.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            GroupInstanceTerminatingEvent event = (GroupInstanceTerminatingEvent) Util.
+                    jsonToObject(message, GroupInstanceTerminatingEvent.class);
+
+            ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, applications);
+
+            } finally {
+                ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, applications);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess(GroupInstanceTerminatingEvent event, Applications applications) {
+
+        // Validate event against the existing applications
+        Application application = applications.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        }
+        Group group = application.getGroupRecursively(event.getGroupId());
+
+        if (group == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+                        event.getGroupId()));
+                return false;
+            }
+        } else {
+            // Apply changes to the applications
+            GroupInstance context = group.getInstanceContexts(event.getInstanceId());
+            if(context == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
+                                    "[instanceId] %s", event.getAppId(), event.getGroupId(),
+                            event.getInstanceId()));
+                    return false;
+                }
+            }
+            // Apply changes to the topology
+            GroupStatus status = GroupStatus.Terminating;
+            if (!context.isStateTransitionValid(status)) {
+                log.error("Invalid State Transition from " + context.getStatus() + " to " +
+                        status);
+                return false;
+            }
+            context.setStatus(status);
+
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupResetProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupResetProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupResetProcessor.java
deleted file mode 100644
index f999ff3..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupResetProcessor.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.instance.GroupInstance;
-import org.apache.stratos.messaging.event.applications.GroupResetEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupResetProcessor extends MessageProcessor {
-    private static final Log log = LogFactory.getLog(GroupResetProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Applications applications = (Applications) object;
-
-        if (GroupResetEvent.class.getName().equals(type)) {
-            // Return if applications has not been initialized
-            if (!applications.isInitialized())
-                return false;
-
-            // Parse complete message and build event
-            GroupResetEvent event = (GroupResetEvent) Util.
-                    jsonToObject(message, GroupResetEvent.class);
-
-            ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
-
-            try {
-                return doProcess(event, applications);
-
-            } finally {
-                ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, applications);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess(GroupResetEvent event, Applications applications) {
-
-        // Validate event against the existing applications
-        Application application = applications.getApplication(event.getAppId());
-        if (application == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Application does not exist: [service] %s",
-                        event.getAppId()));
-            }
-            return false;
-        }
-        Group group = application.getGroupRecursively(event.getGroupId());
-
-        if (group == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
-                        event.getGroupId()));
-                return false;
-            }
-        } else {
-            // Apply changes to the applications
-            GroupInstance context = group.getInstanceContexts(event.getInstanceId());
-            if(context == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
-                                    "[instanceId] %s", event.getAppId(), event.getGroupId(),
-                            event.getInstanceId()));
-                    return false;
-                }
-            }
-            // Apply changes to the topology
-            GroupStatus status = GroupStatus.Inactive;
-            if (!context.isStateTransitionValid(status)) {
-                log.error("Invalid State Transition from " + context.getStatus() + " to " +
-                        status);
-                return false;
-            }
-            context.setStatus(status);
-        }
-
-        // Notify event listeners
-        notifyEventListeners(event);
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java
deleted file mode 100644
index d263174..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.instance.GroupInstance;
-import org.apache.stratos.messaging.event.applications.GroupTerminatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupTerminatedProcessor extends MessageProcessor {
-    private static final Log log = LogFactory.getLog(GroupTerminatedProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Applications applications = (Applications) object;
-
-        if (GroupTerminatedEvent.class.getName().equals(type)) {
-            // Return if applications has not been initialized
-            if (!applications.isInitialized())
-                return false;
-
-            // Parse complete message and build event
-            GroupTerminatedEvent event = (GroupTerminatedEvent) Util.
-                    jsonToObject(message, GroupTerminatedEvent.class);
-
-            ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
-
-            try {
-                return doProcess(event, applications);
-
-            } finally {
-                ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, applications);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess(GroupTerminatedEvent event, Applications applications) {
-
-        // Validate event against the existing applications
-        Application application = applications.getApplication(event.getAppId());
-        if (application == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Application does not exist: [service] %s",
-                        event.getAppId()));
-            }
-            return false;
-        }
-        Group group = application.getGroupRecursively(event.getGroupId());
-
-        if (group == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
-                        event.getGroupId()));
-                return false;
-            }
-        } else {
-            GroupInstance context = group.getInstanceContexts(event.getInstanceId());
-            if(context == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
-                                    "[instanceId] %s", event.getAppId(), event.getGroupId(),
-                            event.getInstanceId()));
-                    return false;
-                }
-            }
-            // Apply changes to the topology
-            GroupStatus status = GroupStatus.Terminated;
-            if (!context.isStateTransitionValid(status)) {
-                log.error("Invalid State Transition from " + context.getStatus() + " to " +
-                        status);
-                return false;
-            }
-            context.setStatus(status);
-
-        }
-
-        // Notify event listeners
-        notifyEventListeners(event);
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java
deleted file mode 100644
index 69df56b..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.instance.GroupInstance;
-import org.apache.stratos.messaging.event.applications.GroupTerminatingEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupTerminatingProcessor extends MessageProcessor {
-    private static final Log log = LogFactory.getLog(GroupTerminatingProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Applications applications = (Applications) object;
-
-        if (GroupTerminatingEvent.class.getName().equals(type)) {
-            // Return if applications has not been initialized
-            if (!applications.isInitialized())
-                return false;
-
-            // Parse complete message and build event
-            GroupTerminatingEvent event = (GroupTerminatingEvent) Util.
-                    jsonToObject(message, GroupTerminatingEvent.class);
-
-            ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
-
-            try {
-                return doProcess(event, applications);
-
-            } finally {
-                ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, applications);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess(GroupTerminatingEvent event, Applications applications) {
-
-        // Validate event against the existing applications
-        Application application = applications.getApplication(event.getAppId());
-        if (application == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Application does not exist: [service] %s",
-                        event.getAppId()));
-            }
-            return false;
-        }
-        Group group = application.getGroupRecursively(event.getGroupId());
-
-        if (group == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
-                        event.getGroupId()));
-                return false;
-            }
-        } else {
-            // Apply changes to the applications
-            GroupInstance context = group.getInstanceContexts(event.getInstanceId());
-            if(context == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
-                                    "[instanceId] %s", event.getAppId(), event.getGroupId(),
-                            event.getInstanceId()));
-                    return false;
-                }
-            }
-            // Apply changes to the topology
-            GroupStatus status = GroupStatus.Terminating;
-            if (!context.isStateTransitionValid(status)) {
-                log.error("Invalid State Transition from " + context.getStatus() + " to " +
-                        status);
-                return false;
-            }
-            context.setStatus(status);
-
-        }
-
-        // Notify event listeners
-        notifyEventListeners(event);
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
deleted file mode 100644
index 56f98cc..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.instance.ClusterInstance;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ClusterActivatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
-import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the cluster activated event
- */
-public class ClusterActivatedProcessor extends MessageProcessor {
-    private static final Log log = LogFactory.getLog(ClusterActivatedProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-
-        Topology topology = (Topology) object;
-
-        if (ClusterActivatedEvent.class.getName().equals(type)) {
-            // Return if topology has not been initialized
-            if (!topology.isInitialized()) {
-                return false;
-            }
-
-            // Parse complete message and build event
-            ClusterActivatedEvent event = (ClusterActivatedEvent) Util.
-                    jsonToObject(message, ClusterActivatedEvent.class);
-
-            TopologyUpdater.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
-            try {
-                return doProcess(event, topology);
-
-            } finally {
-                TopologyUpdater.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess(ClusterActivatedEvent event, Topology topology) {
-
-        // Apply service filter
-        if (TopologyServiceFilter.getInstance().isActive()) {
-            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                // Service is excluded, do not update topology or fire event
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                }
-                return false;
-            }
-        }
-
-        // Apply cluster filter
-        if (TopologyClusterFilter.getInstance().isActive()) {
-            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                // Cluster is excluded, do not update topology or fire event
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                }
-                return false;
-            }
-        }
-
-        // Validate event against the existing topology
-        Service service = topology.getService(event.getServiceName());
-        if (service == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Service does not exist: [service] %s",
-                        event.getServiceName()));
-            }
-            return false;
-        }
-        Cluster cluster = service.getCluster(event.getClusterId());
-
-        if (cluster == null) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(),
-                        event.getClusterId()));
-                return false;
-            }
-        } else {
-            // Apply changes to the topology
-            ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
-            if(context == null) {
-                log.warn("Cluster Instance Context is not found for [cluster] " +
-                        event.getClusterId() + " [instance-id] " +
-                        event.getInstanceId());
-                return false;
-            }
-            ClusterStatus status = ClusterStatus.Active;
-            if (!context.isStateTransitionValid(status)) {
-                log.error("Invalid State Transition from " + context.getStatus() + " to " + status);
-            }
-            context.setStatus(status);
-
-        }
-
-        // Notify event listeners
-        notifyEventListeners(event);
-        return true;
-    }
-
-}