You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/11/05 12:10:38 UTC

[2/2] git commit: Introduced an event listener model to subscribe to incoming events via event processors

Introduced an event listener model to subscribe to incoming events via event processors


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/d5b796ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/d5b796ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/d5b796ee

Branch: refs/heads/master
Commit: d5b796ee2ff9c93dbc06a01bdc0d15717a795f68
Parents: b2fe9d7
Author: Imesh Gunaratne <im...@apache.org>
Authored: Tue Nov 5 16:40:21 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Tue Nov 5 16:40:21 2013 +0530

----------------------------------------------------------------------
 .../stratos/messaging/event/EventListener.java  | 50 ++++++++++++++++++++
 .../messaging/event/EventObservable.java        | 41 ++++++++++++++++
 .../message/processor/MessageProcessor.java     | 43 +++++++++++++++++
 .../topology/ClusterCreatedEventProcessor.java  | 10 ++--
 .../topology/ClusterRemovedEventProcessor.java  | 10 ++--
 .../CompleteTopologyEventProcessor.java         | 11 +++--
 .../topology/InstanceSpawnedEventProcessor.java | 10 ++--
 .../topology/MemberActivatedEventProcessor.java | 10 ++--
 .../topology/MemberStartedEventProcessor.java   | 10 ++--
 .../topology/MemberSuspendedEventProcessor.java | 10 ++--
 .../MemberTerminatedEventProcessor.java         | 10 ++--
 .../topology/ServiceCreatedEventProcessor.java  | 10 ++--
 .../topology/ServiceRemovedEventProcessor.java  | 11 +++--
 .../topology/TopologyEventProcessor.java        | 37 +++++++++++++++
 .../topology/TopologyMessageProcessor.java      | 43 -----------------
 15 files changed, 233 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventListener.java
new file mode 100644
index 0000000..e495bf1
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventListener.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Observable;
+import java.util.Observer;
+
+/**
+ *  Event listener definition.
+ */
+public abstract class EventListener implements Observer {
+    private static final Log log = LogFactory.getLog(EventListener.class);
+
+    @Override
+    public void update(Observable o, Object arg) {
+        if(arg instanceof Event) {
+            Event event = (Event) arg;
+            if(log.isDebugEnabled()) {
+                log.debug(String.format("Event received: %s", event.getClass().getName()));
+            }
+            eventReceived(event);
+        }
+    }
+
+    /**
+     * Triggered when an event is received.
+     * @param event
+     */
+    protected abstract void eventReceived(Event event);
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
new file mode 100644
index 0000000..497f8eb
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event;
+
+import java.util.Observable;
+
+/**
+ *  Event observable definition.
+ */
+public abstract class EventObservable extends Observable {
+
+    public void addEventListener(EventListener eventListener) {
+        addObserver(eventListener);
+    }
+
+    public void removeEventListener(EventListener eventListener) {
+        deleteObserver(eventListener);
+    }
+
+    public void notifyEventListeners(Event event) {
+        setChanged();
+        notifyObservers(event);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java
new file mode 100644
index 0000000..7024cc5
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor;
+
+import org.apache.stratos.messaging.domain.topology.Topology;
+
+/**
+ * Message processor interface. Every Message Processor should implement this.
+ */
+public interface MessageProcessor {
+    
+	/**
+	 * Link a message processor and its successor, if there's any.
+	 * @param nextProcessor
+	 */
+	public abstract void setNext(MessageProcessor nextProcessor);
+
+	/**
+	 * Message processing and delegating logic.
+	 * @param type type of the message. 
+	 * @param message real message body.
+	 * @param object Object that will get updated.
+	 * @return whether the processing was successful or not.
+	 */
+	public abstract boolean process(String type, String message, Object object);
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
index 450d65b..401bebd 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
@@ -24,20 +24,21 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class ClusterCreatedEventProcessor implements TopologyMessageProcessor {
+public class ClusterCreatedEventProcessor extends TopologyEventProcessor {
 
     private static final Log log = LogFactory.getLog(ClusterCreatedEventProcessor.class);
-    private TopologyMessageProcessor nextMsgProcessor;
+    private MessageProcessor nextMsgProcessor;
 
     @Override
-    public void setNext(TopologyMessageProcessor nextProcessor) {
+    public void setNext(MessageProcessor nextProcessor) {
         nextMsgProcessor = nextProcessor;
     }
 
     @Override
-    public boolean process(String type, String message, Topology topology) {
+    public boolean processEvent(String type, String message, Topology topology) {
         if (ClusterCreatedEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
@@ -64,6 +65,7 @@ public class ClusterCreatedEventProcessor implements TopologyMessageProcessor {
                          event.getServiceName(), event.getClusterId()));
             }
 
+            notifyEventListeners(event);
             return true;
 
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
index 26160e7..8e97478 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
@@ -23,20 +23,21 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class ClusterRemovedEventProcessor implements TopologyMessageProcessor {
+public class ClusterRemovedEventProcessor extends TopologyEventProcessor {
 
     private static final Log log = LogFactory.getLog(ClusterRemovedEventProcessor.class);
-    private TopologyMessageProcessor nextMsgProcessor;
+    private MessageProcessor nextMsgProcessor;
 
     @Override
-    public void setNext(TopologyMessageProcessor nextProcessor) {
+    public void setNext(MessageProcessor nextProcessor) {
         nextMsgProcessor = nextProcessor;
     }
 
     @Override
-    public boolean process(String type, String message, Topology topology) {
+    public boolean processEvent(String type, String message, Topology topology) {
         if (ClusterRemovedEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
@@ -60,6 +61,7 @@ public class ClusterRemovedEventProcessor implements TopologyMessageProcessor {
                          event.getServiceName(), event.getClusterId()));
             }
 
+            notifyEventListeners(event);
             return true;
 
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
index 80f6e02..a14b571 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
@@ -22,20 +22,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class CompleteTopologyEventProcessor implements TopologyMessageProcessor {
+public class CompleteTopologyEventProcessor extends TopologyEventProcessor {
 
     private static final Log log = LogFactory.getLog(CompleteTopologyEventProcessor.class);
-    private TopologyMessageProcessor nextMsgProcessor;
+    private MessageProcessor nextMsgProcessor;
 
     @Override
-    public void setNext(TopologyMessageProcessor nextProcessor) {
+    public void setNext(MessageProcessor nextProcessor) {
         nextMsgProcessor = nextProcessor;
     }
 
     @Override
-    public boolean process(String type, String message, Topology topology) {
+    public boolean processEvent(String type, String message, Topology topology) {
         if (CompleteTopologyEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
@@ -43,6 +44,8 @@ public class CompleteTopologyEventProcessor implements TopologyMessageProcessor
             if (log.isInfoEnabled()) {
                 log.info("Topology initialized");
             }
+
+            notifyEventListeners(event);
             return true;
         } else {
             if (nextMsgProcessor != null) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
index c0de675..3d3d410 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
@@ -22,20 +22,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class InstanceSpawnedEventProcessor implements TopologyMessageProcessor {
+public class InstanceSpawnedEventProcessor extends TopologyEventProcessor {
 
     private static final Log log = LogFactory.getLog(InstanceSpawnedEventProcessor.class);
-    private TopologyMessageProcessor nextMsgProcessor;
+    private MessageProcessor nextMsgProcessor;
 
     @Override
-    public void setNext(TopologyMessageProcessor nextProcessor) {
+    public void setNext(MessageProcessor nextProcessor) {
         nextMsgProcessor = nextProcessor;
     }
 
     @Override
-    public boolean process(String type, String message, Topology topology) {
+    public boolean processEvent(String type, String message, Topology topology) {
         if (MemberStartedEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
@@ -70,6 +71,7 @@ public class InstanceSpawnedEventProcessor implements TopologyMessageProcessor {
                         event.getMemberId()));
             }
 
+            notifyEventListeners(event);
             return true;
         } else {
             if (nextMsgProcessor != null) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
index 700f992..0cf39a0 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
@@ -26,20 +26,21 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class MemberActivatedEventProcessor implements TopologyMessageProcessor {
+public class MemberActivatedEventProcessor extends TopologyEventProcessor {
 
     private static final Log log = LogFactory.getLog(MemberActivatedEventProcessor.class);
-    private TopologyMessageProcessor nextMsgProcessor;
+    private MessageProcessor nextMsgProcessor;
 
     @Override
-    public void setNext(TopologyMessageProcessor nextProcessor) {
+    public void setNext(MessageProcessor nextProcessor) {
         nextMsgProcessor = nextProcessor;
     }
 
     @Override
-    public boolean process(String type, String message, Topology topology) {
+    public boolean processEvent(String type, String message, Topology topology) {
         if (MemberActivatedEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class);
@@ -92,6 +93,7 @@ public class MemberActivatedEventProcessor implements TopologyMessageProcessor {
                         event.getMemberId()));
             }
 
+            notifyEventListeners(event);
             return true;
         } else {
             if (nextMsgProcessor != null) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
index 373d713..7d4f482 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
@@ -26,20 +26,21 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class MemberStartedEventProcessor implements TopologyMessageProcessor {
+public class MemberStartedEventProcessor extends TopologyEventProcessor {
 
 	private static final Log log = LogFactory.getLog(MemberStartedEventProcessor.class);
-	private TopologyMessageProcessor nextMsgProcessor;
+	private MessageProcessor nextMsgProcessor;
 
 	@Override
-	public void setNext(TopologyMessageProcessor nextProcessor) {
+	public void setNext(MessageProcessor nextProcessor) {
 		nextMsgProcessor = nextProcessor;
 	}
 
 	@Override
-	public boolean process(String type, String message, Topology topology) {
+	public boolean processEvent(String type, String message, Topology topology) {
 		try {
 			if (MemberStartedEvent.class.getName().equals(type)) {
 				// Parse complete message and build event
@@ -80,6 +81,7 @@ public class MemberStartedEventProcessor implements TopologyMessageProcessor {
                             event.getMemberId()));
 				}
 
+                notifyEventListeners(event);
 				return true;
 			} else {
 				if (nextMsgProcessor != null) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
index 32ffec7..df0eb9b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
@@ -26,20 +26,21 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class MemberSuspendedEventProcessor implements TopologyMessageProcessor {
+public class MemberSuspendedEventProcessor extends TopologyEventProcessor {
 
     private static final Log log = LogFactory.getLog(MemberSuspendedEventProcessor.class);
-    private TopologyMessageProcessor nextMsgProcessor;
+    private MessageProcessor nextMsgProcessor;
 
     @Override
-    public void setNext(TopologyMessageProcessor nextProcessor) {
+    public void setNext(MessageProcessor nextProcessor) {
         nextMsgProcessor = nextProcessor;
     }
 
     @Override
-    public boolean process(String type, String message, Topology topology) {
+    public boolean processEvent(String type, String message, Topology topology) {
         if (MemberSuspendedEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(message, MemberSuspendedEvent.class);
@@ -79,6 +80,7 @@ public class MemberSuspendedEventProcessor implements TopologyMessageProcessor {
                         event.getMemberId()));
             }
 
+            notifyEventListeners(event);
             return true;
         } else {
             if (nextMsgProcessor != null) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
index bd8c235..7c3b0b5 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
@@ -26,20 +26,21 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class MemberTerminatedEventProcessor implements TopologyMessageProcessor {
+public class MemberTerminatedEventProcessor extends TopologyEventProcessor {
 
 	private static final Log log = LogFactory.getLog(MemberTerminatedEventProcessor.class);
-	private TopologyMessageProcessor nextMsgProcessor;
+	private MessageProcessor nextMsgProcessor;
 
 	@Override
-	public void setNext(TopologyMessageProcessor nextProcessor) {
+	public void setNext(MessageProcessor nextProcessor) {
 		nextMsgProcessor = nextProcessor;
 	}
 
 	@Override
-	public boolean process(String type, String message, Topology topology) {
+	public boolean processEvent(String type, String message, Topology topology) {
 		try {
 			if (MemberTerminatedEvent.class.getName().equals(type)) {
 				// Parse complete message and build event
@@ -79,6 +80,7 @@ public class MemberTerminatedEventProcessor implements TopologyMessageProcessor
                             event.getMemberId()));
 				}
 
+                notifyEventListeners(event);
 				return true;
 
 			} else {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
index 94b95d5..f4cbcc2 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
@@ -23,20 +23,21 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class ServiceCreatedEventProcessor implements TopologyMessageProcessor {
+public class ServiceCreatedEventProcessor extends TopologyEventProcessor {
 
     private static final Log log = LogFactory.getLog(ServiceCreatedEventProcessor.class);
-    private TopologyMessageProcessor nextMsgProcessor;
+    private MessageProcessor nextMsgProcessor;
 
     @Override
-    public void setNext(TopologyMessageProcessor nextProcessor) {
+    public void setNext(MessageProcessor nextProcessor) {
         nextMsgProcessor = nextProcessor;
     }
 
     @Override
-    public boolean process(String type, String message, Topology topology) {
+    public boolean processEvent(String type, String message, Topology topology) {
         if (ServiceCreatedEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             ServiceCreatedEvent event = (ServiceCreatedEvent) Util.jsonToObject(message, ServiceCreatedEvent.class);
@@ -54,6 +55,7 @@ public class ServiceCreatedEventProcessor implements TopologyMessageProcessor {
                 log.info(String.format("Service created: [service] %s", event.getServiceName()));
             }
 
+            notifyEventListeners(event);
             return true;
 
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
index cdf307c..80c26b8 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
@@ -23,20 +23,21 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.util.Util;
 
-public class ServiceRemovedEventProcessor implements TopologyMessageProcessor {
+public class ServiceRemovedEventProcessor extends TopologyEventProcessor {
 
     private static final Log log = LogFactory.getLog(ServiceRemovedEventProcessor.class);
-    private TopologyMessageProcessor nextMsgProcessor;
+    private MessageProcessor nextMsgProcessor;
 
     @Override
-    public void setNext(TopologyMessageProcessor nextProcessor) {
+    public void setNext(MessageProcessor nextProcessor) {
         nextMsgProcessor = nextProcessor;
     }
 
     @Override
-    public boolean process(String type, String message, Topology topology) {
+    public boolean processEvent(String type, String message, Topology topology) {
         if (ServiceRemovedEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             ServiceRemovedEvent event = (ServiceRemovedEvent) Util.jsonToObject(message, ServiceRemovedEvent.class);
@@ -54,6 +55,8 @@ public class ServiceRemovedEventProcessor implements TopologyMessageProcessor {
             if (log.isInfoEnabled()) {
                 log.info(String.format("Service removed: [service] %s", event.getServiceName()));
             }
+
+            notifyEventListeners(event);
             return true;
 
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessor.java
new file mode 100644
index 0000000..506dc4d
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.EventObservable;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+
+/**
+ *
+ */
+public abstract class TopologyEventProcessor extends EventObservable implements MessageProcessor {
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        return processEvent(type, message, (Topology) object);
+    }
+
+    protected abstract boolean processEvent(String type, String message, Topology object);
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessor.java
deleted file mode 100644
index f4b4da6..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.stratos.messaging.domain.topology.Topology;
-
-/**
- * Message processor interface. Every Message Processor should implement this.
- */
-public interface TopologyMessageProcessor {
-    
-	/**
-	 * Link a message processor and its successor, if there's any.
-	 * @param nextProcessor
-	 */
-	public abstract void setNext(TopologyMessageProcessor nextProcessor);
-
-	/**
-	 * Message processing and delegating logic.
-	 * @param type type of the message. 
-	 * @param message real message body.
-	 * @param topology Topology that will get updated.
-	 * @return whether the processing was successful or not.
-	 */
-	public abstract boolean process(String type, String message, Topology topology);
-}