You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/11/05 19:26:58 UTC

git commit: Removed TopologyEventProcessor and replaced its usage with MessageProcessor, Renamed TopologyMessageProcessorChain to TopologyEventProcessorChain, Fixed exception handling and log messages

Updated Branches:
  refs/heads/master a2e0659b4 -> a103b79a8


Removed TopologyEventProcessor and replaced its usage with MessageProcessor, Renamed TopologyMessageProcessorChain to TopologyEventProcessorChain, Fixed exception handling and log messages


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/a103b79a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/a103b79a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/a103b79a

Branch: refs/heads/master
Commit: a103b79a8a135e39a0517a3d7dff110bc851b390
Parents: a2e0659
Author: Imesh Gunaratne <im...@wso2.com>
Authored: Tue Nov 5 23:56:48 2013 +0530
Committer: Imesh Gunaratne <im...@wso2.com>
Committed: Tue Nov 5 23:56:48 2013 +0530

----------------------------------------------------------------------
 .../message/processor/MessageProcessor.java     |   5 +-
 .../topology/ClusterCreatedEventProcessor.java  |  16 ++-
 .../topology/ClusterRemovedEventProcessor.java  |  15 ++-
 .../CompleteTopologyEventProcessor.java         |  15 ++-
 .../topology/InstanceSpawnedEventProcessor.java |  15 ++-
 .../topology/MemberActivatedEventProcessor.java |  15 ++-
 .../topology/MemberStartedEventProcessor.java   | 122 +++++++++----------
 .../topology/MemberSuspendedEventProcessor.java |  14 ++-
 .../MemberTerminatedEventProcessor.java         | 120 +++++++++---------
 .../topology/ServiceCreatedEventProcessor.java  |  14 ++-
 .../topology/ServiceRemovedEventProcessor.java  |  15 ++-
 .../topology/TopologyEventProcessor.java        |  37 ------
 .../topology/TopologyEventProcessorChain.java   |  47 +++++++
 .../topology/TopologyMessageProcessorChain.java |  47 -------
 .../topology/TopologyEventMessageDelegator.java |  12 +-
 .../org/apache/stratos/messaging/util/Util.java |   5 +-
 16 files changed, 245 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java
index 7024cc5..b8e973d 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java
@@ -20,11 +20,12 @@
 package org.apache.stratos.messaging.message.processor;
 
 import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.EventObservable;
 
 /**
- * Message processor interface. Every Message Processor should implement this.
+ * Message processor definition.
  */
-public interface MessageProcessor {
+public abstract class MessageProcessor extends EventObservable {
     
 	/**
 	 * Link a message processor and its successor, if there's any.

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
index 401bebd..b311e27 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
@@ -27,21 +27,24 @@ import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class ClusterCreatedEventProcessor extends TopologyEventProcessor {
+public class ClusterCreatedEventProcessor extends MessageProcessor {
 
     private static final Log log = LogFactory.getLog(ClusterCreatedEventProcessor.class);
-    private MessageProcessor nextMsgProcessor;
+    private MessageProcessor nextProcessor;
 
     @Override
     public void setNext(MessageProcessor nextProcessor) {
-        nextMsgProcessor = nextProcessor;
+        this.nextProcessor = nextProcessor;
     }
 
     @Override
-    public boolean processEvent(String type, String message, Topology topology) {
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology)object;
+
         if (ClusterCreatedEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
+
             // Validate event against the existing topology
             Service service = topology.getService(event.getServiceName());
             if (service == null) {
@@ -65,13 +68,14 @@ public class ClusterCreatedEventProcessor extends TopologyEventProcessor {
                          event.getServiceName(), event.getClusterId()));
             }
 
+            // Notify event listeners
             notifyEventListeners(event);
             return true;
 
         } else {
-            if (nextMsgProcessor != null) {
+            if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
-                return nextMsgProcessor.process(type, message, topology);
+                return nextProcessor.process(type, message, topology);
             } else {
                 throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
index 8e97478..689c062 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
@@ -26,18 +26,20 @@ import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class ClusterRemovedEventProcessor extends TopologyEventProcessor {
+public class ClusterRemovedEventProcessor extends MessageProcessor {
 
     private static final Log log = LogFactory.getLog(ClusterRemovedEventProcessor.class);
-    private MessageProcessor nextMsgProcessor;
+    private MessageProcessor nextProcessor;
 
     @Override
     public void setNext(MessageProcessor nextProcessor) {
-        nextMsgProcessor = nextProcessor;
+        this.nextProcessor = nextProcessor;
     }
 
     @Override
-    public boolean processEvent(String type, String message, Topology topology) {
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology)object;
+
         if (ClusterRemovedEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
@@ -61,13 +63,14 @@ public class ClusterRemovedEventProcessor extends TopologyEventProcessor {
                          event.getServiceName(), event.getClusterId()));
             }
 
+            // Notify event listeners
             notifyEventListeners(event);
             return true;
 
         } else {
-            if (nextMsgProcessor != null) {
+            if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
-                return nextMsgProcessor.process(type, message, topology);
+                return nextProcessor.process(type, message, topology);
             } else {
                 throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
index a14b571..488a3eb 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
@@ -25,18 +25,20 @@ import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class CompleteTopologyEventProcessor extends TopologyEventProcessor {
+public class CompleteTopologyEventProcessor extends MessageProcessor {
 
     private static final Log log = LogFactory.getLog(CompleteTopologyEventProcessor.class);
-    private MessageProcessor nextMsgProcessor;
+    private MessageProcessor nextProcessor;
 
     @Override
     public void setNext(MessageProcessor nextProcessor) {
-        nextMsgProcessor = nextProcessor;
+        this.nextProcessor = nextProcessor;
     }
 
     @Override
-    public boolean processEvent(String type, String message, Topology topology) {
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology)object;
+
         if (CompleteTopologyEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
@@ -45,12 +47,13 @@ public class CompleteTopologyEventProcessor extends TopologyEventProcessor {
                 log.info("Topology initialized");
             }
 
+            // Notify event listeners
             notifyEventListeners(event);
             return true;
         } else {
-            if (nextMsgProcessor != null) {
+            if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
-                return nextMsgProcessor.process(type, message, topology);
+                return nextProcessor.process(type, message, topology);
             }
             return false;
         }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
index 3d3d410..5576d75 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
@@ -25,18 +25,20 @@ import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class InstanceSpawnedEventProcessor extends TopologyEventProcessor {
+public class InstanceSpawnedEventProcessor extends MessageProcessor {
 
     private static final Log log = LogFactory.getLog(InstanceSpawnedEventProcessor.class);
-    private MessageProcessor nextMsgProcessor;
+    private MessageProcessor nextProcessor;
 
     @Override
     public void setNext(MessageProcessor nextProcessor) {
-        nextMsgProcessor = nextProcessor;
+        this.nextProcessor = nextProcessor;
     }
 
     @Override
-    public boolean processEvent(String type, String message, Topology topology) {
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology)object;
+
         if (MemberStartedEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
@@ -71,12 +73,13 @@ public class InstanceSpawnedEventProcessor extends TopologyEventProcessor {
                         event.getMemberId()));
             }
 
+            // Notify event listeners
             notifyEventListeners(event);
             return true;
         } else {
-            if (nextMsgProcessor != null) {
+            if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
-                return nextMsgProcessor.process(type, message, topology);
+                return nextProcessor.process(type, message, topology);
             } else {
                 throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
index 0cf39a0..563b2ab 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
@@ -29,18 +29,20 @@ import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class MemberActivatedEventProcessor extends TopologyEventProcessor {
+public class MemberActivatedEventProcessor extends MessageProcessor {
 
     private static final Log log = LogFactory.getLog(MemberActivatedEventProcessor.class);
-    private MessageProcessor nextMsgProcessor;
+    private MessageProcessor nextProcessor;
 
     @Override
     public void setNext(MessageProcessor nextProcessor) {
-        nextMsgProcessor = nextProcessor;
+        this.nextProcessor = nextProcessor;
     }
 
     @Override
-    public boolean processEvent(String type, String message, Topology topology) {
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology)object;
+
         if (MemberActivatedEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class);
@@ -93,12 +95,13 @@ public class MemberActivatedEventProcessor extends TopologyEventProcessor {
                         event.getMemberId()));
             }
 
+            // Notify event listeners
             notifyEventListeners(event);
             return true;
         } else {
-            if (nextMsgProcessor != null) {
+            if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
-                return nextMsgProcessor.process(type, message, topology);
+                return nextProcessor.process(type, message, topology);
             } else {
                 throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
index 7d4f482..9745c32 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
@@ -29,75 +29,69 @@ import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class MemberStartedEventProcessor extends TopologyEventProcessor {
+public class MemberStartedEventProcessor extends MessageProcessor {
 
-	private static final Log log = LogFactory.getLog(MemberStartedEventProcessor.class);
-	private MessageProcessor nextMsgProcessor;
+    private static final Log log = LogFactory.getLog(MemberStartedEventProcessor.class);
+    private MessageProcessor nextProcessor;
 
-	@Override
-	public void setNext(MessageProcessor nextProcessor) {
-		nextMsgProcessor = nextProcessor;
-	}
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
 
-	@Override
-	public boolean processEvent(String type, String message, Topology topology) {
-		try {
-			if (MemberStartedEvent.class.getName().equals(type)) {
-				// Parse complete message and build event
-				MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
 
-				// Validate event against the existing topology
-				Service service = topology.getService(event.getServiceName());
-				if (service == null) {
-					throw new RuntimeException(String.format("Service does not exist: [service] %s",
-					                                         event.getServiceName()));
-				}
-				Cluster cluster = service.getCluster(event.getClusterId());
-				if (cluster == null) {
-					throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                                                             event.getServiceName(), event.getClusterId()));
-				}
-                Member member = cluster.getMember(event.getMemberId());
-                if (member == null) {
-                    throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-                if (member.getStatus() == MemberStatus.Starting) {
-                    throw new RuntimeException(String.format("Member already started: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
+        if (MemberStartedEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
 
-                // Apply changes to the topology
-                member.setStatus(MemberStatus.Starting);
+            // Validate event against the existing topology
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                throw new RuntimeException(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
+            }
+            Cluster cluster = service.getCluster(event.getClusterId());
+            if (cluster == null) {
+                throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            Member member = cluster.getMember(event.getMemberId());
+            if (member == null) {
+                throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+            if (member.getStatus() == MemberStatus.Starting) {
+                throw new RuntimeException(String.format("Member already started: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
 
-				if (log.isInfoEnabled()) {
-					log.info(String.format("Member started: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-				}
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.Starting);
 
-                notifyEventListeners(event);
-				return true;
-			} else {
-				if (nextMsgProcessor != null) {
-					// ask the next processor to take care of the message.
-					return nextMsgProcessor.process(type, message, topology);
-				}
-			}
-		} catch (Exception e) {
-			if (nextMsgProcessor != null) {
-				// ask the next processor to take care of the message.
-				return nextMsgProcessor.process(type, message, topology);
-			} else {
-				throw new RuntimeException(String.format("Failed to process the message: %s of type %s using any of the available processors.",
-				                                         message, type));
-			}
-		}
-		return false;
-	}
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member started: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
index df0eb9b..a4a5ef6 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
@@ -29,18 +29,20 @@ import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class MemberSuspendedEventProcessor extends TopologyEventProcessor {
+public class MemberSuspendedEventProcessor extends MessageProcessor {
 
     private static final Log log = LogFactory.getLog(MemberSuspendedEventProcessor.class);
-    private MessageProcessor nextMsgProcessor;
+    private MessageProcessor nextProcessor;
 
     @Override
     public void setNext(MessageProcessor nextProcessor) {
-        nextMsgProcessor = nextProcessor;
+        this.nextProcessor = nextProcessor;
     }
 
     @Override
-    public boolean processEvent(String type, String message, Topology topology) {
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
         if (MemberSuspendedEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(message, MemberSuspendedEvent.class);
@@ -83,9 +85,9 @@ public class MemberSuspendedEventProcessor extends TopologyEventProcessor {
             notifyEventListeners(event);
             return true;
         } else {
-            if (nextMsgProcessor != null) {
+            if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
-                return nextMsgProcessor.process(type, message, topology);
+                return nextProcessor.process(type, message, topology);
             } else {
                 throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
index 7c3b0b5..87862fc 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
@@ -29,76 +29,68 @@ import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class MemberTerminatedEventProcessor extends TopologyEventProcessor {
+public class MemberTerminatedEventProcessor extends MessageProcessor {
 
-	private static final Log log = LogFactory.getLog(MemberTerminatedEventProcessor.class);
-	private MessageProcessor nextMsgProcessor;
+    private static final Log log = LogFactory.getLog(MemberTerminatedEventProcessor.class);
+    private MessageProcessor nextProcessor;
 
-	@Override
-	public void setNext(MessageProcessor nextProcessor) {
-		nextMsgProcessor = nextProcessor;
-	}
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
 
-	@Override
-	public boolean processEvent(String type, String message, Topology topology) {
-		try {
-			if (MemberTerminatedEvent.class.getName().equals(type)) {
-				// Parse complete message and build event
-				MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(message, MemberTerminatedEvent.class);
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
 
-				// Validate event against the existing topology
-				Service service = topology.getService(event.getServiceName());
-				if (service == null) {
-					throw new RuntimeException(String.format("Service does not exist: [service] %s", event.getServiceName()));
-				}
-				Cluster cluster = service.getCluster(event.getClusterId());
-				if (cluster == null) {
-					throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
-				}
-				Member member = cluster.getMember(event.getMemberId());
-				if (member == null) {
-					throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-				}
-				if (member.getStatus() == MemberStatus.Terminated) {
-					throw new RuntimeException(String.format("Member already terminated: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-				}
+        if (MemberTerminatedEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(message, MemberTerminatedEvent.class);
 
-				// Apply changes to the topology
-				member.setStatus(MemberStatus.Terminated);
-				
-				if (log.isInfoEnabled()) {
-					log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-				}
+            // Validate event against the existing topology
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                throw new RuntimeException(String.format("Service does not exist: [service] %s", event.getServiceName()));
+            }
+            Cluster cluster = service.getCluster(event.getClusterId());
+            if (cluster == null) {
+                throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            Member member = cluster.getMember(event.getMemberId());
+            if (member == null) {
+                throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+            if (member.getStatus() == MemberStatus.Terminated) {
+                throw new RuntimeException(String.format("Member already terminated: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
 
-                notifyEventListeners(event);
-				return true;
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.Terminated);
 
-			} else {
-				if (nextMsgProcessor != null) {
-					// ask the next processor to take care of the message.
-					return nextMsgProcessor.process(type, message, topology);
-				}
-			}
-		} catch (Exception e) {
-			if (nextMsgProcessor != null) {
-				// ask the next processor to take care of the message.
-				return nextMsgProcessor.process(type, message, topology);
-			} else {
-				throw new RuntimeException(String.format("Failed to process the message: %s of type %s using any of the available processors.",
-				                                         message, type));
-			}
-		}
-		return false;
-	}
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
 
+            notifyEventListeners(event);
+            return true;
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
index f4cbcc2..911c006 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
@@ -26,18 +26,20 @@ import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class ServiceCreatedEventProcessor extends TopologyEventProcessor {
+public class ServiceCreatedEventProcessor extends MessageProcessor {
 
     private static final Log log = LogFactory.getLog(ServiceCreatedEventProcessor.class);
-    private MessageProcessor nextMsgProcessor;
+    private MessageProcessor nextProcessor;
 
     @Override
     public void setNext(MessageProcessor nextProcessor) {
-        nextMsgProcessor = nextProcessor;
+        this.nextProcessor = nextProcessor;
     }
 
     @Override
-    public boolean processEvent(String type, String message, Topology topology) {
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology)object;
+
         if (ServiceCreatedEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             ServiceCreatedEvent event = (ServiceCreatedEvent) Util.jsonToObject(message, ServiceCreatedEvent.class);
@@ -59,9 +61,9 @@ public class ServiceCreatedEventProcessor extends TopologyEventProcessor {
             return true;
 
         } else {
-            if (nextMsgProcessor != null) {
+            if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
-                return nextMsgProcessor.process(type, message, topology);
+                return nextProcessor.process(type, message, topology);
             } else {
                 throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
index 80c26b8..289d318 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
@@ -26,18 +26,20 @@ import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class ServiceRemovedEventProcessor extends TopologyEventProcessor {
+public class ServiceRemovedEventProcessor extends MessageProcessor {
 
     private static final Log log = LogFactory.getLog(ServiceRemovedEventProcessor.class);
-    private MessageProcessor nextMsgProcessor;
+    private MessageProcessor nextProcessor;
 
     @Override
     public void setNext(MessageProcessor nextProcessor) {
-        nextMsgProcessor = nextProcessor;
+        this.nextProcessor = nextProcessor;
     }
 
     @Override
-    public boolean processEvent(String type, String message, Topology topology) {
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology)object;
+
         if (ServiceRemovedEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             ServiceRemovedEvent event = (ServiceRemovedEvent) Util.jsonToObject(message, ServiceRemovedEvent.class);
@@ -56,13 +58,14 @@ public class ServiceRemovedEventProcessor extends TopologyEventProcessor {
                 log.info(String.format("Service removed: [service] %s", event.getServiceName()));
             }
 
+            // Notify event listeners
             notifyEventListeners(event);
             return true;
 
         } else {
-            if (nextMsgProcessor != null) {
+            if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
-                return nextMsgProcessor.process(type, message, topology);
+                return nextProcessor.process(type, message, topology);
             } else {
                 throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessor.java
deleted file mode 100644
index 506dc4d..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessor.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.EventObservable;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-
-/**
- *
- */
-public abstract class TopologyEventProcessor extends EventObservable implements MessageProcessor {
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        return processEvent(type, message, (Topology) object);
-    }
-
-    protected abstract boolean processEvent(String type, String message, Topology object);
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessorChain.java
new file mode 100644
index 0000000..917f7c9
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessorChain.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+
+/**
+ * Defines default topology message processor chain.
+ */
+public class TopologyEventProcessorChain extends MessageProcessorChain {
+    private static final Log log = LogFactory.getLog(TopologyEventProcessorChain.class);
+
+    public void initialize() {
+        // Add topology event processors
+        add(new ServiceCreatedEventProcessor());
+        add(new ServiceRemovedEventProcessor());
+        add(new ClusterCreatedEventProcessor());
+        add(new ClusterRemovedEventProcessor());
+        add(new InstanceSpawnedEventProcessor());
+        add(new MemberStartedEventProcessor());
+        add(new MemberActivatedEventProcessor());
+        add(new MemberSuspendedEventProcessor());
+        add(new MemberTerminatedEventProcessor());
+        if(log.isDebugEnabled()) {
+            log.debug("Topology message processor chain initialized");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
deleted file mode 100644
index 0281930..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
-
-/**
- * Defines default topology message processor chain.
- */
-public class TopologyMessageProcessorChain extends MessageProcessorChain {
-    private static final Log log = LogFactory.getLog(TopologyMessageProcessorChain.class);
-
-    public void initialize() {
-        // Add topology event processors
-        add(new ServiceCreatedEventProcessor());
-        add(new ServiceRemovedEventProcessor());
-        add(new ClusterCreatedEventProcessor());
-        add(new ClusterRemovedEventProcessor());
-        add(new InstanceSpawnedEventProcessor());
-        add(new MemberStartedEventProcessor());
-        add(new MemberActivatedEventProcessor());
-        add(new MemberSuspendedEventProcessor());
-        add(new MemberTerminatedEventProcessor());
-        if(log.isDebugEnabled()) {
-            log.debug("Topology message processor chain initialized");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 9b82b27..6986a28 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -39,21 +39,21 @@ import org.apache.stratos.messaging.util.Constants;
 public class TopologyEventMessageDelegator implements Runnable {
 
     private static final Log log = LogFactory.getLog(TopologyEventMessageDelegator.class);
-    private CompleteTopologyEventProcessor completeTopologyEventProcessor;
+    private CompleteTopologyEventProcessor completeTopEvMsgProcessor;
     private MessageProcessorChain processorChain;
 
     public TopologyEventMessageDelegator() {
-        this.completeTopologyEventProcessor = new CompleteTopologyEventProcessor();
-        this.processorChain = new TopologyMessageProcessorChain();
+        this.completeTopEvMsgProcessor = new CompleteTopologyEventProcessor();
+        this.processorChain = new TopologyEventProcessorChain();
     }
 
     public TopologyEventMessageDelegator(MessageProcessorChain processorChain) {
-        this.completeTopologyEventProcessor = new CompleteTopologyEventProcessor();
+        this.completeTopEvMsgProcessor = new CompleteTopologyEventProcessor();
         this.processorChain = processorChain;
     }
 
     public void addCompleteTopologyEventListener(EventListener eventListener) {
-        completeTopologyEventProcessor.addEventListener(eventListener);
+        completeTopEvMsgProcessor.addEventListener(eventListener);
     }
 
     @Override
@@ -72,7 +72,7 @@ public class TopologyEventMessageDelegator implements Runnable {
                     // Retrieve the actual message
                     String json = message.getText();
 
-                    if (completeTopologyEventProcessor.process(type, json, TopologyManager.getTopology())) {
+                    if (completeTopEvMsgProcessor.process(type, json, TopologyManager.getTopology())) {
                         break;
                     }
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
index fa2b27b..eb0035e 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
@@ -44,7 +44,9 @@ public class Util {
 			log.error("Failed to load properties from file: " + filePath, e);
 		} finally {
 			try {
-				is.close();
+                if(is != null) {
+				    is.close();
+                }
 			} catch (IOException ignore) {
 			}
 		}
@@ -88,6 +90,7 @@ public class Util {
             return true;
         }
         catch (NumberFormatException e) {
+            // Not a valid number
         }
         return false;
     }