You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/12/05 11:19:29 UTC

[1/2] Moved complete topology message processor to topology message processor chain and renamed event processors to message processors

Updated Branches:
  refs/heads/master 04add8436 -> f45707a96


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
new file mode 100644
index 0000000..eeae3d3
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class MemberStartedMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(MemberStartedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (MemberStartedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
+
+            // Apply service filter
+            if (ServiceFilter.getInstance().isActive()) {
+                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                    }
+                    return false;
+                }
+            }
+
+            // Apply cluster filter
+            if (ClusterFilter.getInstance().isActive()) {
+                if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return false;
+                }
+            }
+
+            // Validate event against the existing topology
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Service does not exist: [service] %s",
+                            event.getServiceName()));
+                }
+                return false;
+            }
+            Cluster cluster = service.getCluster(event.getClusterId());
+            if (cluster == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                            event.getServiceName(), event.getClusterId()));
+                }
+                return false;
+            }
+            Member member = cluster.getMember(event.getMemberId());
+            if (member == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getMemberId()));
+                }
+                return false;
+            }
+            if (member.getStatus() == MemberStatus.Starting) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Member already started: [service] %s [cluster] %s [member] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getMemberId()));
+                }
+                return false;
+            }
+
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.Starting);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member started: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
deleted file mode 100644
index 7f4428d..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class MemberSuspendedEventProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(MemberSuspendedEventProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
-
-        if (MemberSuspendedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(message, MemberSuspendedEvent.class);
-
-            // Apply service filter
-            if(ServiceFilter.getInstance().isActive()) {
-                if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return true;
-                }
-            }
-
-            // Apply cluster filter
-            if(ClusterFilter.getInstance().isActive()) {
-                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return true;
-                }
-            }
-
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                throw new RuntimeException(String.format("Service does not exist: [service] %s",
-                        event.getServiceName()));
-            }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                        event.getServiceName(), event.getClusterId()));
-            }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-            if (member.getStatus() == MemberStatus.Suspended) {
-                throw new RuntimeException(String.format("Member already suspended: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-
-            // Apply changes to the topology
-            member.setStatus(MemberStatus.Suspended);
-
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Member suspended: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
new file mode 100644
index 0000000..e72d62c
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class MemberSuspendedMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(MemberSuspendedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (MemberSuspendedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(message, MemberSuspendedEvent.class);
+
+            // Apply service filter
+            if (ServiceFilter.getInstance().isActive()) {
+                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                    }
+                    return false;
+                }
+            }
+
+            // Apply cluster filter
+            if (ClusterFilter.getInstance().isActive()) {
+                if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return false;
+                }
+            }
+
+            // Validate event against the existing topology
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Service does not exist: [service] %s",
+                            event.getServiceName()));
+                }
+                return false;
+            }
+            Cluster cluster = service.getCluster(event.getClusterId());
+            if (cluster == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                            event.getServiceName(), event.getClusterId()));
+                }
+                return false;
+            }
+            Member member = cluster.getMember(event.getMemberId());
+            if (member == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getMemberId()));
+                }
+                return false;
+            }
+            if (member.getStatus() == MemberStatus.Suspended) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Member already suspended: [service] %s [cluster] %s [member] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getMemberId()));
+                }
+                return false;
+            }
+
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.Suspended);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member suspended: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
deleted file mode 100644
index b5aac6d..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class MemberTerminatedEventProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(MemberTerminatedEventProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
-
-        if (MemberTerminatedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(message, MemberTerminatedEvent.class);
-
-            // Apply service filter
-            if(ServiceFilter.getInstance().isActive()) {
-                if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return true;
-                }
-            }
-
-            // Apply cluster filter
-            if(ClusterFilter.getInstance().isActive()) {
-                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return true;
-                }
-            }
-
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                throw new RuntimeException(String.format("Service does not exist: [service] %s", event.getServiceName()));
-            }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                        event.getServiceName(), event.getClusterId()));
-            }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-            if (member.getStatus() == MemberStatus.Terminated) {
-                throw new RuntimeException(String.format("Member already terminated: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-
-            // Apply changes to the topology
-            member.setStatus(MemberStatus.Terminated);
-
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-
-            notifyEventListeners(event);
-            return true;
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
new file mode 100644
index 0000000..f8d52d6
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class MemberTerminatedMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(MemberTerminatedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (MemberTerminatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(message, MemberTerminatedEvent.class);
+
+            // Apply service filter
+            if (ServiceFilter.getInstance().isActive()) {
+                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                    }
+                    return false;
+                }
+            }
+
+            // Apply cluster filter
+            if (ClusterFilter.getInstance().isActive()) {
+                if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return false;
+                }
+            }
+
+            // Validate event against the existing topology
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+                }
+                return false;
+            }
+            Cluster cluster = service.getCluster(event.getClusterId());
+            if (cluster == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                            event.getServiceName(), event.getClusterId()));
+                }
+                return false;
+            }
+            Member member = cluster.getMember(event.getMemberId());
+            if (member == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getMemberId()));
+                }
+                return false;
+            }
+            if (member.getStatus() == MemberStatus.Terminated) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Member already terminated: [service] %s [cluster] %s [member] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getMemberId()));
+                }
+                return false;
+            }
+
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.Terminated);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+
+            notifyEventListeners(event);
+            return true;
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
deleted file mode 100644
index f2c8059..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class ServiceCreatedEventProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(ServiceCreatedEventProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology)object;
-
-        if (ServiceCreatedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            ServiceCreatedEvent event = (ServiceCreatedEvent) Util.jsonToObject(message, ServiceCreatedEvent.class);
-
-            // Apply service filter
-            if(ServiceFilter.getInstance().isActive()) {
-                if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return true;
-                }
-            }
-
-            // Validate event against the existing topology
-            if (topology.serviceExists(event.getServiceName())) {
-                throw new RuntimeException(String.format("Service already created: [service] %s", event.getServiceName()));
-            }
-
-            // Apply changes to the topology
-            Service service = new Service(event.getServiceName());
-            service.addPorts(event.getPorts());
-            topology.addService(service);
-
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Service created: [service] %s", event.getServiceName()));
-            }
-
-            notifyEventListeners(event);
-            return true;
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
new file mode 100644
index 0000000..ff768dc
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class ServiceCreatedMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(ServiceCreatedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (ServiceCreatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            ServiceCreatedEvent event = (ServiceCreatedEvent) Util.jsonToObject(message, ServiceCreatedEvent.class);
+
+            // Apply service filter
+            if (ServiceFilter.getInstance().isActive()) {
+                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                    }
+                    return false;
+                }
+            }
+
+            // Validate event against the existing topology
+            if (topology.serviceExists(event.getServiceName())) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Service already created: [service] %s", event.getServiceName()));
+                }
+            }
+
+            // Apply changes to the topology
+            Service service = new Service(event.getServiceName());
+            service.addPorts(event.getPorts());
+            topology.addService(service);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Service created: [service] %s", event.getServiceName()));
+            }
+
+            notifyEventListeners(event);
+            return true;
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
deleted file mode 100644
index 3859f60..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class ServiceRemovedEventProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(ServiceRemovedEventProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology)object;
-
-        if (ServiceRemovedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            ServiceRemovedEvent event = (ServiceRemovedEvent) Util.jsonToObject(message, ServiceRemovedEvent.class);
-
-            // Apply service filter
-            if(ServiceFilter.getInstance().isActive()) {
-                if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return true;
-                }
-            }
-
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                throw new RuntimeException(String.format("Service does not exist: [service] %s",
-                        event.getServiceName()));
-            }
-
-            // Apply changes to the topology
-            topology.removeService(service);
-
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Service removed: [service] %s", event.getServiceName()));
-            }
-
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
new file mode 100644
index 0000000..5f31ec2
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class ServiceRemovedMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(ServiceRemovedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (ServiceRemovedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            ServiceRemovedEvent event = (ServiceRemovedEvent) Util.jsonToObject(message, ServiceRemovedEvent.class);
+
+            // Apply service filter
+            if (ServiceFilter.getInstance().isActive()) {
+                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                    }
+                    return false;
+                }
+            }
+
+            // Validate event against the existing topology
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Service does not exist: [service] %s",
+                            event.getServiceName()));
+                }
+                return false;
+            }
+
+            // Apply changes to the topology
+            topology.removeService(service);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Service removed: [service] %s", event.getServiceName()));
+            }
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessorChain.java
deleted file mode 100644
index 75e395d..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessorChain.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.listener.EventListener;
-import org.apache.stratos.messaging.listener.topology.*;
-import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
-
-/**
- * Defines default topology message processor chain.
- */
-public class TopologyEventProcessorChain extends MessageProcessorChain {
-    private static final Log log = LogFactory.getLog(TopologyEventProcessorChain.class);
-
-    private ServiceCreatedEventProcessor serviceCreatedEventProcessor;
-    private ServiceRemovedEventProcessor serviceRemovedEventProcessor;
-    private ClusterCreatedEventProcessor clusterCreatedEventProcessor;
-    private ClusterRemovedEventProcessor clusterRemovedEventProcessor;
-    private InstanceSpawnedEventProcessor instanceSpawnedEventProcessor;
-    private MemberStartedEventProcessor memberStartedEventProcessor;
-    private MemberActivatedEventProcessor memberActivatedEventProcessor;
-    private MemberSuspendedEventProcessor memberSuspendedEventProcessor;
-    private MemberTerminatedEventProcessor memberTerminatedEventProcessor;
-
-    public void initialize() {
-        // Add topology event processors
-        serviceCreatedEventProcessor = new ServiceCreatedEventProcessor();
-        add(serviceCreatedEventProcessor);
-
-        serviceRemovedEventProcessor = new ServiceRemovedEventProcessor();
-        add(serviceRemovedEventProcessor);
-
-        clusterCreatedEventProcessor = new ClusterCreatedEventProcessor();
-        add(clusterCreatedEventProcessor);
-
-        clusterRemovedEventProcessor = new ClusterRemovedEventProcessor();
-        add(clusterRemovedEventProcessor);
-
-        instanceSpawnedEventProcessor = new InstanceSpawnedEventProcessor();
-        add(instanceSpawnedEventProcessor);
-
-        memberStartedEventProcessor = new MemberStartedEventProcessor();
-        add(memberStartedEventProcessor);
-
-        memberActivatedEventProcessor = new MemberActivatedEventProcessor();
-        add(memberActivatedEventProcessor);
-
-        memberSuspendedEventProcessor = new MemberSuspendedEventProcessor();
-        add(memberSuspendedEventProcessor);
-
-        memberTerminatedEventProcessor = new MemberTerminatedEventProcessor();
-        add(memberTerminatedEventProcessor);
-
-        add(new CompleteTopologyEventIgnoreProcessor());
-
-        if(log.isDebugEnabled()) {
-            log.debug("Topology message processor chain initialized");
-        }
-    }
-
-    public void addEventListener(EventListener eventListener) {
-        if(eventListener instanceof ClusterCreatedEventListener) {
-            clusterCreatedEventProcessor.addEventListener(eventListener);
-        }
-        else if(eventListener instanceof ClusterRemovedEventListener) {
-            clusterRemovedEventProcessor.addEventListener(eventListener);
-        }
-        else if(eventListener instanceof InstanceSpawnedEventListener) {
-            instanceSpawnedEventProcessor.addEventListener(eventListener);
-        }
-        else if(eventListener instanceof MemberActivatedEventListener) {
-            memberActivatedEventProcessor.addEventListener(eventListener);
-        }
-        else if(eventListener instanceof MemberStartedEventListener) {
-            memberStartedEventProcessor.addEventListener(eventListener);
-        }
-        else if(eventListener instanceof MemberSuspendedEventListener) {
-            memberSuspendedEventProcessor.addEventListener(eventListener);
-        }
-        else if(eventListener instanceof MemberTerminatedEventListener) {
-            memberTerminatedEventProcessor.addEventListener(eventListener);
-        }
-        else if(eventListener instanceof ServiceCreatedEventListener) {
-            serviceCreatedEventProcessor.addEventListener(eventListener);
-        }
-        else if(eventListener instanceof ServiceRemovedEventListener) {
-            serviceRemovedEventProcessor.addEventListener(eventListener);
-        }
-        else {
-            throw new RuntimeException("Unknown event listener");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
new file mode 100644
index 0000000..1281761
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.listener.topology.*;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+
+/**
+ * Defines default topology message processor chain.
+ */
+public class TopologyMessageProcessorChain extends MessageProcessorChain {
+    private static final Log log = LogFactory.getLog(TopologyMessageProcessorChain.class);
+
+    private CompleteTopologyMessageProcessor completeTopologyMessageProcessor;
+    private ServiceCreatedMessageProcessor serviceCreatedMessageProcessor;
+    private ServiceRemovedMessageProcessor serviceRemovedMessageProcessor;
+    private ClusterCreatedMessageProcessor clusterCreatedMessageProcessor;
+    private ClusterRemovedMessageProcessor clusterRemovedMessageProcessor;
+    private InstanceSpawnedMessageProcessor instanceSpawnedMessageProcessor;
+    private MemberStartedMessageProcessor memberStartedMessageProcessor;
+    private MemberActivatedMessageProcessor memberActivatedMessageProcessor;
+    private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor;
+    private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor;
+
+    public void initialize() {
+        // Add topology event processors
+        completeTopologyMessageProcessor = new CompleteTopologyMessageProcessor();
+        add(completeTopologyMessageProcessor);
+
+        serviceCreatedMessageProcessor = new ServiceCreatedMessageProcessor();
+        add(serviceCreatedMessageProcessor);
+
+        serviceRemovedMessageProcessor = new ServiceRemovedMessageProcessor();
+        add(serviceRemovedMessageProcessor);
+
+        clusterCreatedMessageProcessor = new ClusterCreatedMessageProcessor();
+        add(clusterCreatedMessageProcessor);
+
+        clusterRemovedMessageProcessor = new ClusterRemovedMessageProcessor();
+        add(clusterRemovedMessageProcessor);
+
+        instanceSpawnedMessageProcessor = new InstanceSpawnedMessageProcessor();
+        add(instanceSpawnedMessageProcessor);
+
+        memberStartedMessageProcessor = new MemberStartedMessageProcessor();
+        add(memberStartedMessageProcessor);
+
+        memberActivatedMessageProcessor = new MemberActivatedMessageProcessor();
+        add(memberActivatedMessageProcessor);
+
+        memberSuspendedMessageProcessor = new MemberSuspendedMessageProcessor();
+        add(memberSuspendedMessageProcessor);
+
+        memberTerminatedMessageProcessor = new MemberTerminatedMessageProcessor();
+        add(memberTerminatedMessageProcessor);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Topology message processor chain initialized");
+        }
+    }
+
+    public void addEventListener(EventListener eventListener) {
+        if (eventListener instanceof CompleteTopologyEventListener) {
+            completeTopologyMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof ClusterCreatedEventListener) {
+            clusterCreatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof ClusterRemovedEventListener) {
+            clusterRemovedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof InstanceSpawnedEventListener) {
+            instanceSpawnedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof MemberActivatedEventListener) {
+            memberActivatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof MemberStartedEventListener) {
+            memberStartedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof MemberSuspendedEventListener) {
+            memberSuspendedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof MemberTerminatedEventListener) {
+            memberTerminatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof ServiceCreatedEventListener) {
+            serviceCreatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof ServiceRemovedEventListener) {
+            serviceRemovedMessageProcessor.addEventListener(eventListener);
+        } else {
+            throw new RuntimeException("Unknown event listener");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 5afb54b..6f672c4 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -22,7 +22,6 @@ import javax.jms.TextMessage;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
 import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
 import org.apache.stratos.messaging.message.processor.topology.*;
 import org.apache.stratos.messaging.util.Constants;
@@ -31,59 +30,26 @@ import org.apache.stratos.messaging.util.Constants;
 /**
  * Implements logic for processing topology event messages based on a given
  * topology process chain.
- *
- * Functionality:
- * - Wait for the complete topology event.
- * - Process messages using the given message processor chain.
  */
 public class TopologyEventMessageDelegator implements Runnable {
 
     private static final Log log = LogFactory.getLog(TopologyEventMessageDelegator.class);
-    private CompleteTopologyEventProcessor completeTopEvMsgProcessor;
     private MessageProcessorChain processorChain;
     private boolean terminated;
 
     public TopologyEventMessageDelegator() {
-        this.completeTopEvMsgProcessor = new CompleteTopologyEventProcessor();
-        this.processorChain = new TopologyEventProcessorChain();
+        this.processorChain = new TopologyMessageProcessorChain();
     }
 
     public TopologyEventMessageDelegator(MessageProcessorChain processorChain) {
-        this.completeTopEvMsgProcessor = new CompleteTopologyEventProcessor();
         this.processorChain = processorChain;
     }
 
-    public void addCompleteTopologyEventListener(CompleteTopologyEventListener eventListener) {
-        completeTopEvMsgProcessor.addEventListener(eventListener);
-    }
-
-    public void removeCompleteTopologyEventListener(CompleteTopologyEventListener eventListener) {
-        completeTopEvMsgProcessor.removeEventListener(eventListener);
-    }
-
     @Override
     public void run() {
         try {
-            if(log.isInfoEnabled()) {
+            if (log.isInfoEnabled()) {
                 log.info("Topology event message delegator started");
-                log.info("Waiting for the complete topology event message...");
-            }
-            while (!terminated) {
-                try {
-                    // First take the complete topology event
-                    TextMessage message = TopologyEventQueue.getInstance().take();
-                    // Retrieve the header
-                    String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
-                    // Retrieve the actual message
-                    String json = message.getText();
-
-                    if (completeTopEvMsgProcessor.process(type, json, TopologyManager.getTopology())) {
-                        break;
-                    }
-
-                } catch (Exception e) {
-                    throw new RuntimeException("Failed to retrieve the complete topology", e);
-                }
             }
 
             while (!terminated) {
@@ -92,6 +58,7 @@ public class TopologyEventMessageDelegator implements Runnable {
 
                     // Retrieve the header
                     String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
                     // Retrieve the actual message
                     String json = message.getText();
 
@@ -101,20 +68,20 @@ public class TopologyEventMessageDelegator implements Runnable {
 
                     try {
                         TopologyManager.acquireWriteLock();
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("Delegating : %s", type));
+                        }
                         processorChain.process(type, json, TopologyManager.getTopology());
                     } finally {
                         TopologyManager.releaseWriteLock();
                     }
 
                 } catch (Exception e) {
-                    String error = "Failed to retrieve topology event message";
-                    log.error(error, e);
-                    throw new RuntimeException(error, e);
+                    log.error("Failed to retrieve topology event message", e);
                 }
             }
-        }
-        catch (Exception e) {
-            if(log.isErrorEnabled()) {
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
                 log.error("Topology event message delegator failed", e);
             }
         }


[2/2] git commit: Moved complete topology message processor to topology message processor chain and renamed event processors to message processors

Posted by im...@apache.org.
Moved complete topology message processor to topology message processor chain and renamed event processors to message processors


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/f45707a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/f45707a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/f45707a9

Branch: refs/heads/master
Commit: f45707a965b47f77d0c59c6c8b32d3a0c9447d67
Parents: 04add84
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Dec 5 15:49:00 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Dec 5 15:49:00 2013 +0530

----------------------------------------------------------------------
 .../processors/AutoscalerTopologyReceiver.java  |  17 +--
 .../extension/api/LoadBalancerExtension.java    |  17 +--
 .../balancer/LoadBalancerTopologyReceiver.java  |  15 +-
 .../messaging/domain/topology/Topology.java     |   9 ++
 .../messaging/event/EventObservable.java        |  10 ++
 .../topology/ClusterCreatedEventProcessor.java  | 113 --------------
 .../ClusterCreatedMessageProcessor.java         | 122 +++++++++++++++
 .../topology/ClusterRemovedEventProcessor.java  | 110 --------------
 .../ClusterRemovedMessageProcessor.java         | 119 +++++++++++++++
 .../CompleteTopologyEventIgnoreProcessor.java   |  58 -------
 .../CompleteTopologyEventProcessor.java         |  97 ------------
 .../CompleteTopologyMessageProcessor.java       | 104 +++++++++++++
 .../topology/InstanceSpawnedEventProcessor.java | 113 --------------
 .../InstanceSpawnedMessageProcessor.java        | 126 +++++++++++++++
 .../topology/MemberActivatedEventProcessor.java | 134 ----------------
 .../MemberActivatedMessageProcessor.java        | 152 +++++++++++++++++++
 .../topology/MemberStartedEventProcessor.java   | 121 ---------------
 .../topology/MemberStartedMessageProcessor.java | 137 +++++++++++++++++
 .../topology/MemberSuspendedEventProcessor.java | 120 ---------------
 .../MemberSuspendedMessageProcessor.java        | 136 +++++++++++++++++
 .../MemberTerminatedEventProcessor.java         | 120 ---------------
 .../MemberTerminatedMessageProcessor.java       | 136 +++++++++++++++++
 .../topology/ServiceCreatedEventProcessor.java  |  85 -----------
 .../ServiceCreatedMessageProcessor.java         |  91 +++++++++++
 .../topology/ServiceRemovedEventProcessor.java  |  86 -----------
 .../ServiceRemovedMessageProcessor.java         |  93 ++++++++++++
 .../topology/TopologyEventProcessorChain.java   | 112 --------------
 .../topology/TopologyMessageProcessorChain.java | 107 +++++++++++++
 .../topology/TopologyEventMessageDelegator.java |  51 ++-----
 29 files changed, 1369 insertions(+), 1342 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
index 9a9588c..ab77a79 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
@@ -21,7 +21,6 @@ package org.apache.stratos.autoscaler.topology.processors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.AutoscalerContext;
 import org.apache.stratos.autoscaler.ClusterContext;
 import org.apache.stratos.autoscaler.ClusterMonitor;
 import org.apache.stratos.autoscaler.PartitionContext;
@@ -40,10 +39,9 @@ import org.apache.stratos.messaging.listener.topology.MemberStartedEventListener
 import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener;
 import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
 import org.apache.stratos.messaging.event.topology.*;
-import org.apache.stratos.messaging.message.processor.topology.TopologyEventProcessorChain;
+import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
 
 import java.util.Collection;
 
@@ -82,9 +80,8 @@ public class AutoscalerTopologyReceiver implements Runnable {
     }
 
     private TopologyEventMessageDelegator createMessageDelegator() {
-        TopologyEventProcessorChain processorChain = createEventProcessorChain();
-        final TopologyEventMessageDelegator messageDelegator = new TopologyEventMessageDelegator(processorChain);
-        messageDelegator.addCompleteTopologyEventListener(new CompleteTopologyEventListener() {
+        TopologyMessageProcessorChain processorChain = createEventProcessorChain();
+        processorChain.addEventListener(new CompleteTopologyEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
@@ -99,17 +96,15 @@ public class AutoscalerTopologyReceiver implements Runnable {
                 finally {
                     TopologyManager.releaseReadLock();
                 }
-                // Complete topology is only consumed once, remove listener
-                messageDelegator.removeCompleteTopologyEventListener(this);
             }
 
         });
-        return messageDelegator;
+        return new TopologyEventMessageDelegator(processorChain);
     }
 
-    private TopologyEventProcessorChain createEventProcessorChain() {
+    private TopologyMessageProcessorChain createEventProcessorChain() {
         // Listen to topology events that affect clusters
-        TopologyEventProcessorChain processorChain = new TopologyEventProcessorChain();
+        TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
         processorChain.addEventListener(new ClusterCreatedEventListener() {
             @Override
             protected void onEvent(Event event) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index 61781c9..95dc039 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -24,7 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.load.balancer.common.topology.TopologyReceiver;
 import org.apache.stratos.messaging.event.Event;
 import org.apache.stratos.messaging.listener.topology.*;
-import org.apache.stratos.messaging.message.processor.topology.TopologyEventProcessorChain;
+import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 
@@ -74,9 +74,8 @@ public class LoadBalancerExtension implements Runnable {
     }
 
     private TopologyEventMessageDelegator createMessageDelegator() {
-        TopologyEventProcessorChain processorChain = createEventProcessorChain();
-        final TopologyEventMessageDelegator messageDelegator = new TopologyEventMessageDelegator(processorChain);
-        messageDelegator.addCompleteTopologyEventListener(new CompleteTopologyEventListener() {
+        TopologyMessageProcessorChain processorChain = createEventProcessorChain();
+        processorChain.addEventListener(new CompleteTopologyEventListener() {
 
             @Override
             protected void onEvent(Event event) {
@@ -87,10 +86,6 @@ public class LoadBalancerExtension implements Runnable {
                     // Start load balancer
                     loadBalancer.start();
                     loadBalancerStarted = true;
-
-                    // Complete topology event is only received once
-                    // Remove event listener
-                    messageDelegator.removeCompleteTopologyEventListener(this);
                 } catch (Exception e) {
                     if (log.isErrorEnabled()) {
                         log.error("Could not start load balancer", e);
@@ -99,11 +94,11 @@ public class LoadBalancerExtension implements Runnable {
                 }
             }
         });
-        return messageDelegator;
+        return new TopologyEventMessageDelegator(processorChain);
     }
 
-    private TopologyEventProcessorChain createEventProcessorChain() {
-        TopologyEventProcessorChain processorChain = new TopologyEventProcessorChain();
+    private TopologyMessageProcessorChain createEventProcessorChain() {
+        TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
         processorChain.addEventListener(new MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
index fba24e7..2ecc5a2 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
@@ -31,7 +31,7 @@ import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListe
 import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
 import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
 import org.apache.stratos.messaging.event.topology.*;
-import org.apache.stratos.messaging.message.processor.topology.TopologyEventProcessorChain;
+import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 
@@ -67,9 +67,8 @@ public class LoadBalancerTopologyReceiver implements Runnable {
     }
 
     private TopologyEventMessageDelegator createMessageDelegator() {
-        TopologyEventProcessorChain processorChain = createEventProcessorChain();
-        final TopologyEventMessageDelegator messageDelegator = new TopologyEventMessageDelegator(processorChain);
-        messageDelegator.addCompleteTopologyEventListener(new CompleteTopologyEventListener() {
+        TopologyMessageProcessorChain processorChain = createEventProcessorChain();
+        processorChain.addEventListener(new CompleteTopologyEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
@@ -85,8 +84,6 @@ public class LoadBalancerTopologyReceiver implements Runnable {
                 finally {
                     TopologyManager.releaseReadLock();
                 }
-                // Complete topology is only consumed once, remove listener
-                messageDelegator.removeCompleteTopologyEventListener(this);
             }
 
             private boolean hasActiveMembers(Cluster cluster) {
@@ -98,12 +95,12 @@ public class LoadBalancerTopologyReceiver implements Runnable {
                 return false;
             }
         });
-        return messageDelegator;
+        return new TopologyEventMessageDelegator(processorChain);
     }
 
-    private TopologyEventProcessorChain createEventProcessorChain() {
+    private TopologyMessageProcessorChain createEventProcessorChain() {
         // Listen to topology events that affect clusters
-        TopologyEventProcessorChain processorChain = new TopologyEventProcessorChain();
+        TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
         processorChain.addEventListener(new MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
index 57e99e8..2c503ee 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
@@ -31,6 +31,7 @@ public class Topology implements Serializable {
     private static final long serialVersionUID = -2453583548027402122L;
     // Key: Service.serviceName
     private Map<String, Service> serviceMap;
+    private boolean initialized;
 
     public Topology() {
         this.serviceMap = new HashMap<String, Service>();
@@ -69,4 +70,12 @@ public class Topology implements Serializable {
     public void clear() {
         this.serviceMap.clear();
     }
+
+    public void setInitialized(boolean initialized) {
+        this.initialized = initialized;
+    }
+
+    public boolean isInitialized() {
+        return initialized;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
index 528d426..0a5d18f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
@@ -19,6 +19,8 @@
 
 package org.apache.stratos.messaging.event;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.listener.EventListener;
 
 import java.util.Observable;
@@ -28,15 +30,23 @@ import java.util.Observable;
  */
 public abstract class EventObservable extends Observable {
 
+    private static final Log log = LogFactory.getLog(EventObservable.class);
+
     public void addEventListener(EventListener eventListener) {
         addObserver(eventListener);
     }
 
     public void removeEventListener(EventListener eventListener) {
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Removing event listeners: [event-listener] %s", eventListener.getClass().getName()));
+        }
         deleteObserver(eventListener);
     }
 
     public void notifyEventListeners(Event event) {
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Notifying event listeners: [event] %s", event.getClass().getName()));
+        }
         setChanged();
         notifyObservers(event);
     }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
deleted file mode 100644
index a4d09ba..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class ClusterCreatedEventProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(ClusterCreatedEventProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology)object;
-
-        if (ClusterCreatedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
-
-            // Apply service filter
-            if(ServiceFilter.getInstance().isActive()) {
-                if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return true;
-                }
-            }
-
-            // Apply cluster filter
-            if(ClusterFilter.getInstance().isActive()) {
-                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return true;
-                }
-            }
-
-            // Validate event properties
-            if(StringUtils.isBlank(event.getHostName())) {
-                throw new RuntimeException("Hostname not found in cluster created event");
-            }
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                throw new RuntimeException(String.format("Service does not exist: [service] %s",
-                        event.getServiceName()));
-            }
-            if (service.clusterExists(event.getClusterId())) {
-                throw new RuntimeException(String.format("Cluster already exists in service: [service] %s [cluster] %s",
-                        event.getServiceName(),
-                        event.getClusterId()));
-            }
-
-            // Apply changes to the topology
-            Cluster cluster = new Cluster(event.getServiceName(), event.getClusterId(), event.getAutoscalingPolicyName());
-            cluster.addHostName(event.getHostName());
-            cluster.setTenantRange(event.getTenantRange());
-
-            service.addCluster(cluster);
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Cluster created: [service] %s [cluster] %s [hostname] %s",
-                         event.getServiceName(), event.getClusterId(), event.getHostName()));
-            }
-
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
new file mode 100644
index 0000000..8a8d394
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class ClusterCreatedMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(ClusterCreatedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (ClusterCreatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
+
+            // Apply service filter
+            if (ServiceFilter.getInstance().isActive()) {
+                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                    }
+                    return false;
+                }
+            }
+
+            // Apply cluster filter
+            if (ClusterFilter.getInstance().isActive()) {
+                if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return false;
+                }
+            }
+
+            // Validate event properties
+            if (StringUtils.isBlank(event.getHostName())) {
+                throw new RuntimeException("Hostname not found in cluster created event");
+            }
+            // Validate event against the existing topology
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Service does not exist: [service] %s",
+                            event.getServiceName()));
+                }
+                return false;
+            }
+            if (service.clusterExists(event.getClusterId())) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(),
+                            event.getClusterId()));
+                }
+                return false;
+            }
+
+            // Apply changes to the topology
+            Cluster cluster = new Cluster(event.getServiceName(), event.getClusterId(), event.getAutoscalingPolicyName());
+            cluster.addHostName(event.getHostName());
+            cluster.setTenantRange(event.getTenantRange());
+
+            service.addCluster(cluster);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Cluster created: [service] %s [cluster] %s [hostname] %s",
+                        event.getServiceName(), event.getClusterId(), event.getHostName()));
+            }
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
deleted file mode 100644
index 295923f..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class ClusterRemovedEventProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(ClusterRemovedEventProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology)object;
-
-        if (ClusterRemovedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
-
-            // Apply service filter
-            if(ServiceFilter.getInstance().isActive()) {
-                if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return true;
-                }
-            }
-
-            // Apply cluster filter
-            if(ClusterFilter.getInstance().isActive()) {
-                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return true;
-                }
-            }
-
-            // Validate event properties
-            if(StringUtils.isBlank(event.getHostName())) {
-                throw new RuntimeException("Hostname not found in cluster removed event");
-            }
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                throw new RuntimeException(String.format("Service does not exist: [service] %s",
-                        event.getServiceName()));
-            }
-            if (!service.clusterExists(event.getClusterId())) {
-                throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s [hostname] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getHostName()));
-            }
-
-            // Apply changes to the topology
-            service.removeCluster(event.getClusterId());
-
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Cluster removed from service: [service] %s [cluster] %s",
-                         event.getServiceName(), event.getClusterId()));
-            }
-
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
new file mode 100644
index 0000000..b93273f
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class ClusterRemovedMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(ClusterRemovedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (ClusterRemovedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
+
+            // Apply service filter
+            if (ServiceFilter.getInstance().isActive()) {
+                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                    }
+                    return false;
+                }
+            }
+
+            // Apply cluster filter
+            if (ClusterFilter.getInstance().isActive()) {
+                if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return false;
+                }
+            }
+
+            // Validate event properties
+            if (StringUtils.isBlank(event.getHostName())) {
+                throw new RuntimeException("Hostname not found in cluster removed event");
+            }
+            // Validate event against the existing topology
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+                }
+                return false;
+            }
+            if (!service.clusterExists(event.getClusterId())) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s [hostname] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getHostName()));
+                }
+                return false;
+            }
+
+            // Apply changes to the topology
+            service.removeCluster(event.getClusterId());
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Cluster removed from service: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventIgnoreProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventIgnoreProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventIgnoreProcessor.java
deleted file mode 100644
index 5e6d47c..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventIgnoreProcessor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * An event processor to ignore complete topology event in a given message processor chain.
- */
-public class CompleteTopologyEventIgnoreProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(CompleteTopologyEventIgnoreProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology)object;
-
-        if (CompleteTopologyEvent.class.getName().equals(type)) {
-            if(log.isDebugEnabled()) {
-                log.debug("Complete topology event ignored");
-            }
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            }
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
deleted file mode 100644
index c61a2a5..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class CompleteTopologyEventProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(CompleteTopologyEventProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
-
-        if (CompleteTopologyEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
-
-            // Apply service filter
-            if (ServiceFilter.getInstance().isActive()) {
-                // Add services included in service filter
-                for (Service service : event.getTopology().getServices()) {
-                    if (ServiceFilter.getInstance().included(service.getServiceName())) {
-                        topology.addService(service);
-                    }
-                    else {
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Service is excluded: [service] %s", service.getServiceName()));
-                        }
-                    }
-                }
-            } else {
-                // Add all services
-                topology.addServices(event.getTopology().getServices());
-            }
-
-            // Apply cluster filter
-            if (ClusterFilter.getInstance().isActive()) {
-                for (Service service : topology.getServices()) {
-                    for (Cluster cluster : service.getClusters()) {
-                        if (ClusterFilter.getInstance().excluded(cluster.getClusterId())) {
-                            service.removeCluster(cluster.getClusterId());
-                            if(log.isDebugEnabled()) {
-                                log.debug(String.format("Cluster is excluded: [cluster] %s", cluster.getClusterId()));
-                            }
-                        }
-                    }
-                }
-            }
-
-            if (log.isInfoEnabled()) {
-                log.info("Topology initialized");
-            }
-
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            }
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
new file mode 100644
index 0000000..5cea64e
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class CompleteTopologyMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(CompleteTopologyMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (CompleteTopologyEvent.class.getName().equals(type)) {
+            // Return if topology has already initialized
+            if (topology.isInitialized()) {
+                return false;
+            }
+
+            // Parse complete message and build event
+            CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
+
+            // Apply service filter
+            if (ServiceFilter.getInstance().isActive()) {
+                // Add services included in service filter
+                for (Service service : event.getTopology().getServices()) {
+                    if (ServiceFilter.getInstance().included(service.getServiceName())) {
+                        topology.addService(service);
+                    } else {
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("Service is excluded: [service] %s", service.getServiceName()));
+                        }
+                    }
+                }
+            } else {
+                // Add all services
+                topology.addServices(event.getTopology().getServices());
+            }
+
+            // Apply cluster filter
+            if (ClusterFilter.getInstance().isActive()) {
+                for (Service service : topology.getServices()) {
+                    for (Cluster cluster : service.getClusters()) {
+                        if (ClusterFilter.getInstance().excluded(cluster.getClusterId())) {
+                            service.removeCluster(cluster.getClusterId());
+                            if (log.isDebugEnabled()) {
+                                log.debug(String.format("Cluster is excluded: [cluster] %s", cluster.getClusterId()));
+                            }
+                        }
+                    }
+                }
+            }
+
+            if (log.isInfoEnabled()) {
+                log.info("Topology initialized");
+            }
+
+            // Set topology initialized
+            topology.setInitialized(true);
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            }
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
deleted file mode 100644
index 0d2f0e2..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.topology.InstanceSpawnedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class InstanceSpawnedEventProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(InstanceSpawnedEventProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology)object;
-
-        if (InstanceSpawnedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            InstanceSpawnedEvent event = (InstanceSpawnedEvent) Util.jsonToObject(message, InstanceSpawnedEvent.class);
-
-            // Apply service filter
-            if(ServiceFilter.getInstance().isActive()) {
-                if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return true;
-                }
-            }
-
-            // Apply cluster filter
-            if(ClusterFilter.getInstance().isActive()) {
-                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return true;
-                }
-            }
-
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                throw new RuntimeException(String.format("Service does not exist: [service] %s",
-                        event.getServiceName()));
-            }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                        event.getServiceName(), event.getClusterId()));
-            }
-            if (cluster.memberExists(event.getMemberId())) {
-                throw new RuntimeException(String.format("Member already exists: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-
-            // Apply changes to the topology
-            Member member = new Member(event.getServiceName(), event.getClusterId(), event.getMemberId());
-            member.setStatus(MemberStatus.Created);
-            member.setPartitionId(event.getPartitionId());
-            cluster.addMember(member);
-
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Member created: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
new file mode 100644
index 0000000..f614782
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.event.topology.InstanceSpawnedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class InstanceSpawnedMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(InstanceSpawnedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (InstanceSpawnedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            InstanceSpawnedEvent event = (InstanceSpawnedEvent) Util.jsonToObject(message, InstanceSpawnedEvent.class);
+
+            // Apply service filter
+            if (ServiceFilter.getInstance().isActive()) {
+                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                    }
+                    return false;
+                }
+            }
+
+            // Apply cluster filter
+            if (ClusterFilter.getInstance().isActive()) {
+                if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return false;
+                }
+            }
+
+            // Validate event against the existing topology
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Service does not exist: [service] %s",
+                            event.getServiceName()));
+                }
+                return false;
+            }
+            Cluster cluster = service.getCluster(event.getClusterId());
+            if (cluster == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                            event.getServiceName(), event.getClusterId()));
+                }
+                return false;
+            }
+            if (cluster.memberExists(event.getMemberId())) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Member already exists: [service] %s [cluster] %s [member] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getMemberId()));
+                }
+                return false;
+            }
+
+            // Apply changes to the topology
+            Member member = new Member(event.getServiceName(), event.getClusterId(), event.getMemberId());
+            member.setStatus(MemberStatus.Created);
+            member.setPartitionId(event.getPartitionId());
+            cluster.addMember(member);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member created: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
deleted file mode 100644
index def9545..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class MemberActivatedEventProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(MemberActivatedEventProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology)object;
-
-        if (MemberActivatedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class);
-
-            // Apply service filter
-            if(ServiceFilter.getInstance().isActive()) {
-                if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return true;
-                }
-            }
-
-            // Apply cluster filter
-            if(ClusterFilter.getInstance().isActive()) {
-                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return true;
-                }
-            }
-
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                throw new RuntimeException(String.format("Service does not exist: [service] %s", event.getServiceName()));
-            }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                        event.getServiceName(), event.getClusterId()));
-            }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-            if (member.getStatus() == MemberStatus.Activated) {
-                throw new RuntimeException(String.format("Member already activated: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-            if((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) {
-                throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-            if ((event.getPorts() == null) || (event.getPorts().size() == 0)) {
-                throw new RuntimeException(String.format("No ports found in member activated event: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-
-            // Apply changes to the topology
-            member.addPorts(event.getPorts());
-            member.setMemberIp(event.getMemberIp());
-            member.setStatus(MemberStatus.Activated);
-
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Member activated: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
new file mode 100644
index 0000000..e299b7d
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class MemberActivatedMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(MemberActivatedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (MemberActivatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class);
+
+            // Apply service filter
+            if (ServiceFilter.getInstance().isActive()) {
+                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                    }
+                    return false;
+                }
+            }
+
+            // Apply cluster filter
+            if (ClusterFilter.getInstance().isActive()) {
+                if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return false;
+                }
+            }
+
+            // Validate event properties
+            if ((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) {
+                throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+            if ((event.getPorts() == null) || (event.getPorts().size() == 0)) {
+                throw new RuntimeException(String.format("No ports found in member activated event: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+
+            // Validate event against the existing topology
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+                }
+                return false;
+            }
+            Cluster cluster = service.getCluster(event.getClusterId());
+            if (cluster == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                            event.getServiceName(), event.getClusterId()));
+                }
+                return false;
+            }
+            Member member = cluster.getMember(event.getMemberId());
+            if (member == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getMemberId()));
+                }
+                return false;
+            }
+            if (member.getStatus() == MemberStatus.Activated) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Member already activated: [service] %s [cluster] %s [member] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getMemberId()));
+                }
+                return false;
+            }
+
+            // Apply changes to the topology
+            member.addPorts(event.getPorts());
+            member.setMemberIp(event.getMemberIp());
+            member.setStatus(MemberStatus.Activated);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member activated: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
deleted file mode 100644
index 849efcc..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class MemberStartedEventProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(MemberStartedEventProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
-
-        if (MemberStartedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
-
-            // Apply service filter
-            if(ServiceFilter.getInstance().isActive()) {
-                if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return true;
-                }
-            }
-
-            // Apply cluster filter
-            if(ClusterFilter.getInstance().isActive()) {
-                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return true;
-                }
-            }
-
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                throw new RuntimeException(String.format("Service does not exist: [service] %s",
-                        event.getServiceName()));
-            }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                        event.getServiceName(), event.getClusterId()));
-            }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-            if (member.getStatus() == MemberStatus.Starting) {
-                throw new RuntimeException(String.format("Member already started: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-
-            // Apply changes to the topology
-            member.setStatus(MemberStatus.Starting);
-
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Member started: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}