You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/12/05 11:19:30 UTC

[2/2] git commit: Moved complete topology message processor to topology message processor chain and renamed event processors to message processors

Moved complete topology message processor to topology message processor chain and renamed event processors to message processors


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/f45707a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/f45707a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/f45707a9

Branch: refs/heads/master
Commit: f45707a965b47f77d0c59c6c8b32d3a0c9447d67
Parents: 04add84
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Dec 5 15:49:00 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Dec 5 15:49:00 2013 +0530

----------------------------------------------------------------------
 .../processors/AutoscalerTopologyReceiver.java  |  17 +--
 .../extension/api/LoadBalancerExtension.java    |  17 +--
 .../balancer/LoadBalancerTopologyReceiver.java  |  15 +-
 .../messaging/domain/topology/Topology.java     |   9 ++
 .../messaging/event/EventObservable.java        |  10 ++
 .../topology/ClusterCreatedEventProcessor.java  | 113 --------------
 .../ClusterCreatedMessageProcessor.java         | 122 +++++++++++++++
 .../topology/ClusterRemovedEventProcessor.java  | 110 --------------
 .../ClusterRemovedMessageProcessor.java         | 119 +++++++++++++++
 .../CompleteTopologyEventIgnoreProcessor.java   |  58 -------
 .../CompleteTopologyEventProcessor.java         |  97 ------------
 .../CompleteTopologyMessageProcessor.java       | 104 +++++++++++++
 .../topology/InstanceSpawnedEventProcessor.java | 113 --------------
 .../InstanceSpawnedMessageProcessor.java        | 126 +++++++++++++++
 .../topology/MemberActivatedEventProcessor.java | 134 ----------------
 .../MemberActivatedMessageProcessor.java        | 152 +++++++++++++++++++
 .../topology/MemberStartedEventProcessor.java   | 121 ---------------
 .../topology/MemberStartedMessageProcessor.java | 137 +++++++++++++++++
 .../topology/MemberSuspendedEventProcessor.java | 120 ---------------
 .../MemberSuspendedMessageProcessor.java        | 136 +++++++++++++++++
 .../MemberTerminatedEventProcessor.java         | 120 ---------------
 .../MemberTerminatedMessageProcessor.java       | 136 +++++++++++++++++
 .../topology/ServiceCreatedEventProcessor.java  |  85 -----------
 .../ServiceCreatedMessageProcessor.java         |  91 +++++++++++
 .../topology/ServiceRemovedEventProcessor.java  |  86 -----------
 .../ServiceRemovedMessageProcessor.java         |  93 ++++++++++++
 .../topology/TopologyEventProcessorChain.java   | 112 --------------
 .../topology/TopologyMessageProcessorChain.java | 107 +++++++++++++
 .../topology/TopologyEventMessageDelegator.java |  51 ++-----
 29 files changed, 1369 insertions(+), 1342 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
index 9a9588c..ab77a79 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
@@ -21,7 +21,6 @@ package org.apache.stratos.autoscaler.topology.processors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.AutoscalerContext;
 import org.apache.stratos.autoscaler.ClusterContext;
 import org.apache.stratos.autoscaler.ClusterMonitor;
 import org.apache.stratos.autoscaler.PartitionContext;
@@ -40,10 +39,9 @@ import org.apache.stratos.messaging.listener.topology.MemberStartedEventListener
 import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener;
 import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
 import org.apache.stratos.messaging.event.topology.*;
-import org.apache.stratos.messaging.message.processor.topology.TopologyEventProcessorChain;
+import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
 
 import java.util.Collection;
 
@@ -82,9 +80,8 @@ public class AutoscalerTopologyReceiver implements Runnable {
     }
 
     private TopologyEventMessageDelegator createMessageDelegator() {
-        TopologyEventProcessorChain processorChain = createEventProcessorChain();
-        final TopologyEventMessageDelegator messageDelegator = new TopologyEventMessageDelegator(processorChain);
-        messageDelegator.addCompleteTopologyEventListener(new CompleteTopologyEventListener() {
+        TopologyMessageProcessorChain processorChain = createEventProcessorChain();
+        processorChain.addEventListener(new CompleteTopologyEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
@@ -99,17 +96,15 @@ public class AutoscalerTopologyReceiver implements Runnable {
                 finally {
                     TopologyManager.releaseReadLock();
                 }
-                // Complete topology is only consumed once, remove listener
-                messageDelegator.removeCompleteTopologyEventListener(this);
             }
 
         });
-        return messageDelegator;
+        return new TopologyEventMessageDelegator(processorChain);
     }
 
-    private TopologyEventProcessorChain createEventProcessorChain() {
+    private TopologyMessageProcessorChain createEventProcessorChain() {
         // Listen to topology events that affect clusters
-        TopologyEventProcessorChain processorChain = new TopologyEventProcessorChain();
+        TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
         processorChain.addEventListener(new ClusterCreatedEventListener() {
             @Override
             protected void onEvent(Event event) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index 61781c9..95dc039 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -24,7 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.load.balancer.common.topology.TopologyReceiver;
 import org.apache.stratos.messaging.event.Event;
 import org.apache.stratos.messaging.listener.topology.*;
-import org.apache.stratos.messaging.message.processor.topology.TopologyEventProcessorChain;
+import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 
@@ -74,9 +74,8 @@ public class LoadBalancerExtension implements Runnable {
     }
 
     private TopologyEventMessageDelegator createMessageDelegator() {
-        TopologyEventProcessorChain processorChain = createEventProcessorChain();
-        final TopologyEventMessageDelegator messageDelegator = new TopologyEventMessageDelegator(processorChain);
-        messageDelegator.addCompleteTopologyEventListener(new CompleteTopologyEventListener() {
+        TopologyMessageProcessorChain processorChain = createEventProcessorChain();
+        processorChain.addEventListener(new CompleteTopologyEventListener() {
 
             @Override
             protected void onEvent(Event event) {
@@ -87,10 +86,6 @@ public class LoadBalancerExtension implements Runnable {
                     // Start load balancer
                     loadBalancer.start();
                     loadBalancerStarted = true;
-
-                    // Complete topology event is only received once
-                    // Remove event listener
-                    messageDelegator.removeCompleteTopologyEventListener(this);
                 } catch (Exception e) {
                     if (log.isErrorEnabled()) {
                         log.error("Could not start load balancer", e);
@@ -99,11 +94,11 @@ public class LoadBalancerExtension implements Runnable {
                 }
             }
         });
-        return messageDelegator;
+        return new TopologyEventMessageDelegator(processorChain);
     }
 
-    private TopologyEventProcessorChain createEventProcessorChain() {
-        TopologyEventProcessorChain processorChain = new TopologyEventProcessorChain();
+    private TopologyMessageProcessorChain createEventProcessorChain() {
+        TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
         processorChain.addEventListener(new MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
index fba24e7..2ecc5a2 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
@@ -31,7 +31,7 @@ import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListe
 import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
 import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
 import org.apache.stratos.messaging.event.topology.*;
-import org.apache.stratos.messaging.message.processor.topology.TopologyEventProcessorChain;
+import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 
@@ -67,9 +67,8 @@ public class LoadBalancerTopologyReceiver implements Runnable {
     }
 
     private TopologyEventMessageDelegator createMessageDelegator() {
-        TopologyEventProcessorChain processorChain = createEventProcessorChain();
-        final TopologyEventMessageDelegator messageDelegator = new TopologyEventMessageDelegator(processorChain);
-        messageDelegator.addCompleteTopologyEventListener(new CompleteTopologyEventListener() {
+        TopologyMessageProcessorChain processorChain = createEventProcessorChain();
+        processorChain.addEventListener(new CompleteTopologyEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
@@ -85,8 +84,6 @@ public class LoadBalancerTopologyReceiver implements Runnable {
                 finally {
                     TopologyManager.releaseReadLock();
                 }
-                // Complete topology is only consumed once, remove listener
-                messageDelegator.removeCompleteTopologyEventListener(this);
             }
 
             private boolean hasActiveMembers(Cluster cluster) {
@@ -98,12 +95,12 @@ public class LoadBalancerTopologyReceiver implements Runnable {
                 return false;
             }
         });
-        return messageDelegator;
+        return new TopologyEventMessageDelegator(processorChain);
     }
 
-    private TopologyEventProcessorChain createEventProcessorChain() {
+    private TopologyMessageProcessorChain createEventProcessorChain() {
         // Listen to topology events that affect clusters
-        TopologyEventProcessorChain processorChain = new TopologyEventProcessorChain();
+        TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
         processorChain.addEventListener(new MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
index 57e99e8..2c503ee 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
@@ -31,6 +31,7 @@ public class Topology implements Serializable {
     private static final long serialVersionUID = -2453583548027402122L;
     // Key: Service.serviceName
     private Map<String, Service> serviceMap;
+    private boolean initialized;
 
     public Topology() {
         this.serviceMap = new HashMap<String, Service>();
@@ -69,4 +70,12 @@ public class Topology implements Serializable {
     public void clear() {
         this.serviceMap.clear();
     }
+
+    public void setInitialized(boolean initialized) {
+        this.initialized = initialized;
+    }
+
+    public boolean isInitialized() {
+        return initialized;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
index 528d426..0a5d18f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
@@ -19,6 +19,8 @@
 
 package org.apache.stratos.messaging.event;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.listener.EventListener;
 
 import java.util.Observable;
@@ -28,15 +30,23 @@ import java.util.Observable;
  */
 public abstract class EventObservable extends Observable {
 
+    private static final Log log = LogFactory.getLog(EventObservable.class);
+
     public void addEventListener(EventListener eventListener) {
         addObserver(eventListener);
     }
 
     public void removeEventListener(EventListener eventListener) {
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Removing event listeners: [event-listener] %s", eventListener.getClass().getName()));
+        }
         deleteObserver(eventListener);
     }
 
     public void notifyEventListeners(Event event) {
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Notifying event listeners: [event] %s", event.getClass().getName()));
+        }
         setChanged();
         notifyObservers(event);
     }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
deleted file mode 100644
index a4d09ba..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class ClusterCreatedEventProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(ClusterCreatedEventProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology)object;
-
-        if (ClusterCreatedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
-
-            // Apply service filter
-            if(ServiceFilter.getInstance().isActive()) {
-                if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return true;
-                }
-            }
-
-            // Apply cluster filter
-            if(ClusterFilter.getInstance().isActive()) {
-                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return true;
-                }
-            }
-
-            // Validate event properties
-            if(StringUtils.isBlank(event.getHostName())) {
-                throw new RuntimeException("Hostname not found in cluster created event");
-            }
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                throw new RuntimeException(String.format("Service does not exist: [service] %s",
-                        event.getServiceName()));
-            }
-            if (service.clusterExists(event.getClusterId())) {
-                throw new RuntimeException(String.format("Cluster already exists in service: [service] %s [cluster] %s",
-                        event.getServiceName(),
-                        event.getClusterId()));
-            }
-
-            // Apply changes to the topology
-            Cluster cluster = new Cluster(event.getServiceName(), event.getClusterId(), event.getAutoscalingPolicyName());
-            cluster.addHostName(event.getHostName());
-            cluster.setTenantRange(event.getTenantRange());
-
-            service.addCluster(cluster);
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Cluster created: [service] %s [cluster] %s [hostname] %s",
-                         event.getServiceName(), event.getClusterId(), event.getHostName()));
-            }
-
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
new file mode 100644
index 0000000..8a8d394
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class ClusterCreatedMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(ClusterCreatedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (ClusterCreatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
+
+            // Apply service filter
+            if (ServiceFilter.getInstance().isActive()) {
+                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                    }
+                    return false;
+                }
+            }
+
+            // Apply cluster filter
+            if (ClusterFilter.getInstance().isActive()) {
+                if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return false;
+                }
+            }
+
+            // Validate event properties
+            if (StringUtils.isBlank(event.getHostName())) {
+                throw new RuntimeException("Hostname not found in cluster created event");
+            }
+            // Validate event against the existing topology
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Service does not exist: [service] %s",
+                            event.getServiceName()));
+                }
+                return false;
+            }
+            if (service.clusterExists(event.getClusterId())) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(),
+                            event.getClusterId()));
+                }
+                return false;
+            }
+
+            // Apply changes to the topology
+            Cluster cluster = new Cluster(event.getServiceName(), event.getClusterId(), event.getAutoscalingPolicyName());
+            cluster.addHostName(event.getHostName());
+            cluster.setTenantRange(event.getTenantRange());
+
+            service.addCluster(cluster);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Cluster created: [service] %s [cluster] %s [hostname] %s",
+                        event.getServiceName(), event.getClusterId(), event.getHostName()));
+            }
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
deleted file mode 100644
index 295923f..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class ClusterRemovedEventProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(ClusterRemovedEventProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology)object;
-
-        if (ClusterRemovedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
-
-            // Apply service filter
-            if(ServiceFilter.getInstance().isActive()) {
-                if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return true;
-                }
-            }
-
-            // Apply cluster filter
-            if(ClusterFilter.getInstance().isActive()) {
-                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return true;
-                }
-            }
-
-            // Validate event properties
-            if(StringUtils.isBlank(event.getHostName())) {
-                throw new RuntimeException("Hostname not found in cluster removed event");
-            }
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                throw new RuntimeException(String.format("Service does not exist: [service] %s",
-                        event.getServiceName()));
-            }
-            if (!service.clusterExists(event.getClusterId())) {
-                throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s [hostname] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getHostName()));
-            }
-
-            // Apply changes to the topology
-            service.removeCluster(event.getClusterId());
-
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Cluster removed from service: [service] %s [cluster] %s",
-                         event.getServiceName(), event.getClusterId()));
-            }
-
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
new file mode 100644
index 0000000..b93273f
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class ClusterRemovedMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(ClusterRemovedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (ClusterRemovedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
+
+            // Apply service filter
+            if (ServiceFilter.getInstance().isActive()) {
+                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                    }
+                    return false;
+                }
+            }
+
+            // Apply cluster filter
+            if (ClusterFilter.getInstance().isActive()) {
+                if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return false;
+                }
+            }
+
+            // Validate event properties
+            if (StringUtils.isBlank(event.getHostName())) {
+                throw new RuntimeException("Hostname not found in cluster removed event");
+            }
+            // Validate event against the existing topology
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+                }
+                return false;
+            }
+            if (!service.clusterExists(event.getClusterId())) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s [hostname] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getHostName()));
+                }
+                return false;
+            }
+
+            // Apply changes to the topology
+            service.removeCluster(event.getClusterId());
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Cluster removed from service: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventIgnoreProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventIgnoreProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventIgnoreProcessor.java
deleted file mode 100644
index 5e6d47c..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventIgnoreProcessor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * An event processor to ignore complete topology event in a given message processor chain.
- */
-public class CompleteTopologyEventIgnoreProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(CompleteTopologyEventIgnoreProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology)object;
-
-        if (CompleteTopologyEvent.class.getName().equals(type)) {
-            if(log.isDebugEnabled()) {
-                log.debug("Complete topology event ignored");
-            }
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            }
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
deleted file mode 100644
index c61a2a5..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class CompleteTopologyEventProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(CompleteTopologyEventProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
-
-        if (CompleteTopologyEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
-
-            // Apply service filter
-            if (ServiceFilter.getInstance().isActive()) {
-                // Add services included in service filter
-                for (Service service : event.getTopology().getServices()) {
-                    if (ServiceFilter.getInstance().included(service.getServiceName())) {
-                        topology.addService(service);
-                    }
-                    else {
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Service is excluded: [service] %s", service.getServiceName()));
-                        }
-                    }
-                }
-            } else {
-                // Add all services
-                topology.addServices(event.getTopology().getServices());
-            }
-
-            // Apply cluster filter
-            if (ClusterFilter.getInstance().isActive()) {
-                for (Service service : topology.getServices()) {
-                    for (Cluster cluster : service.getClusters()) {
-                        if (ClusterFilter.getInstance().excluded(cluster.getClusterId())) {
-                            service.removeCluster(cluster.getClusterId());
-                            if(log.isDebugEnabled()) {
-                                log.debug(String.format("Cluster is excluded: [cluster] %s", cluster.getClusterId()));
-                            }
-                        }
-                    }
-                }
-            }
-
-            if (log.isInfoEnabled()) {
-                log.info("Topology initialized");
-            }
-
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            }
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
new file mode 100644
index 0000000..5cea64e
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class CompleteTopologyMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(CompleteTopologyMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (CompleteTopologyEvent.class.getName().equals(type)) {
+            // Return if topology has already initialized
+            if (topology.isInitialized()) {
+                return false;
+            }
+
+            // Parse complete message and build event
+            CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
+
+            // Apply service filter
+            if (ServiceFilter.getInstance().isActive()) {
+                // Add services included in service filter
+                for (Service service : event.getTopology().getServices()) {
+                    if (ServiceFilter.getInstance().included(service.getServiceName())) {
+                        topology.addService(service);
+                    } else {
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("Service is excluded: [service] %s", service.getServiceName()));
+                        }
+                    }
+                }
+            } else {
+                // Add all services
+                topology.addServices(event.getTopology().getServices());
+            }
+
+            // Apply cluster filter
+            if (ClusterFilter.getInstance().isActive()) {
+                for (Service service : topology.getServices()) {
+                    for (Cluster cluster : service.getClusters()) {
+                        if (ClusterFilter.getInstance().excluded(cluster.getClusterId())) {
+                            service.removeCluster(cluster.getClusterId());
+                            if (log.isDebugEnabled()) {
+                                log.debug(String.format("Cluster is excluded: [cluster] %s", cluster.getClusterId()));
+                            }
+                        }
+                    }
+                }
+            }
+
+            if (log.isInfoEnabled()) {
+                log.info("Topology initialized");
+            }
+
+            // Set topology initialized
+            topology.setInitialized(true);
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            }
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
deleted file mode 100644
index 0d2f0e2..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.topology.InstanceSpawnedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class InstanceSpawnedEventProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(InstanceSpawnedEventProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology)object;
-
-        if (InstanceSpawnedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            InstanceSpawnedEvent event = (InstanceSpawnedEvent) Util.jsonToObject(message, InstanceSpawnedEvent.class);
-
-            // Apply service filter
-            if(ServiceFilter.getInstance().isActive()) {
-                if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return true;
-                }
-            }
-
-            // Apply cluster filter
-            if(ClusterFilter.getInstance().isActive()) {
-                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return true;
-                }
-            }
-
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                throw new RuntimeException(String.format("Service does not exist: [service] %s",
-                        event.getServiceName()));
-            }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                        event.getServiceName(), event.getClusterId()));
-            }
-            if (cluster.memberExists(event.getMemberId())) {
-                throw new RuntimeException(String.format("Member already exists: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-
-            // Apply changes to the topology
-            Member member = new Member(event.getServiceName(), event.getClusterId(), event.getMemberId());
-            member.setStatus(MemberStatus.Created);
-            member.setPartitionId(event.getPartitionId());
-            cluster.addMember(member);
-
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Member created: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
new file mode 100644
index 0000000..f614782
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.event.topology.InstanceSpawnedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class InstanceSpawnedMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(InstanceSpawnedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (InstanceSpawnedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            InstanceSpawnedEvent event = (InstanceSpawnedEvent) Util.jsonToObject(message, InstanceSpawnedEvent.class);
+
+            // Apply service filter
+            if (ServiceFilter.getInstance().isActive()) {
+                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                    }
+                    return false;
+                }
+            }
+
+            // Apply cluster filter
+            if (ClusterFilter.getInstance().isActive()) {
+                if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return false;
+                }
+            }
+
+            // Validate event against the existing topology
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Service does not exist: [service] %s",
+                            event.getServiceName()));
+                }
+                return false;
+            }
+            Cluster cluster = service.getCluster(event.getClusterId());
+            if (cluster == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                            event.getServiceName(), event.getClusterId()));
+                }
+                return false;
+            }
+            if (cluster.memberExists(event.getMemberId())) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Member already exists: [service] %s [cluster] %s [member] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getMemberId()));
+                }
+                return false;
+            }
+
+            // Apply changes to the topology
+            Member member = new Member(event.getServiceName(), event.getClusterId(), event.getMemberId());
+            member.setStatus(MemberStatus.Created);
+            member.setPartitionId(event.getPartitionId());
+            cluster.addMember(member);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member created: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
deleted file mode 100644
index def9545..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class MemberActivatedEventProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(MemberActivatedEventProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology)object;
-
-        if (MemberActivatedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class);
-
-            // Apply service filter
-            if(ServiceFilter.getInstance().isActive()) {
-                if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return true;
-                }
-            }
-
-            // Apply cluster filter
-            if(ClusterFilter.getInstance().isActive()) {
-                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return true;
-                }
-            }
-
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                throw new RuntimeException(String.format("Service does not exist: [service] %s", event.getServiceName()));
-            }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                        event.getServiceName(), event.getClusterId()));
-            }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-            if (member.getStatus() == MemberStatus.Activated) {
-                throw new RuntimeException(String.format("Member already activated: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-            if((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) {
-                throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-            if ((event.getPorts() == null) || (event.getPorts().size() == 0)) {
-                throw new RuntimeException(String.format("No ports found in member activated event: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-
-            // Apply changes to the topology
-            member.addPorts(event.getPorts());
-            member.setMemberIp(event.getMemberIp());
-            member.setStatus(MemberStatus.Activated);
-
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Member activated: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
new file mode 100644
index 0000000..e299b7d
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class MemberActivatedMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(MemberActivatedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (MemberActivatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class);
+
+            // Apply service filter
+            if (ServiceFilter.getInstance().isActive()) {
+                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                    }
+                    return false;
+                }
+            }
+
+            // Apply cluster filter
+            if (ClusterFilter.getInstance().isActive()) {
+                if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return false;
+                }
+            }
+
+            // Validate event properties
+            if ((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) {
+                throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+            if ((event.getPorts() == null) || (event.getPorts().size() == 0)) {
+                throw new RuntimeException(String.format("No ports found in member activated event: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+
+            // Validate event against the existing topology
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+                }
+                return false;
+            }
+            Cluster cluster = service.getCluster(event.getClusterId());
+            if (cluster == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                            event.getServiceName(), event.getClusterId()));
+                }
+                return false;
+            }
+            Member member = cluster.getMember(event.getMemberId());
+            if (member == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getMemberId()));
+                }
+                return false;
+            }
+            if (member.getStatus() == MemberStatus.Activated) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Member already activated: [service] %s [cluster] %s [member] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getMemberId()));
+                }
+                return false;
+            }
+
+            // Apply changes to the topology
+            member.addPorts(event.getPorts());
+            member.setMemberIp(event.getMemberIp());
+            member.setStatus(MemberStatus.Activated);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member activated: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
deleted file mode 100644
index 849efcc..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class MemberStartedEventProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(MemberStartedEventProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
-
-        if (MemberStartedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
-
-            // Apply service filter
-            if(ServiceFilter.getInstance().isActive()) {
-                if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return true;
-                }
-            }
-
-            // Apply cluster filter
-            if(ClusterFilter.getInstance().isActive()) {
-                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return true;
-                }
-            }
-
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                throw new RuntimeException(String.format("Service does not exist: [service] %s",
-                        event.getServiceName()));
-            }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                        event.getServiceName(), event.getClusterId()));
-            }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-            if (member.getStatus() == MemberStatus.Starting) {
-                throw new RuntimeException(String.format("Member already started: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-
-            // Apply changes to the topology
-            member.setStatus(MemberStatus.Starting);
-
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Member started: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
-            }
-
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}