You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/11/05 12:10:38 UTC
[2/2] git commit: Introduced an event listener model to subscribe to
incoming events via event processors
Introduced an event listener model to subscribe to incoming events via event processors
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/d5b796ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/d5b796ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/d5b796ee
Branch: refs/heads/master
Commit: d5b796ee2ff9c93dbc06a01bdc0d15717a795f68
Parents: b2fe9d7
Author: Imesh Gunaratne <im...@apache.org>
Authored: Tue Nov 5 16:40:21 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Tue Nov 5 16:40:21 2013 +0530
----------------------------------------------------------------------
.../stratos/messaging/event/EventListener.java | 50 ++++++++++++++++++++
.../messaging/event/EventObservable.java | 41 ++++++++++++++++
.../message/processor/MessageProcessor.java | 43 +++++++++++++++++
.../topology/ClusterCreatedEventProcessor.java | 10 ++--
.../topology/ClusterRemovedEventProcessor.java | 10 ++--
.../CompleteTopologyEventProcessor.java | 11 +++--
.../topology/InstanceSpawnedEventProcessor.java | 10 ++--
.../topology/MemberActivatedEventProcessor.java | 10 ++--
.../topology/MemberStartedEventProcessor.java | 10 ++--
.../topology/MemberSuspendedEventProcessor.java | 10 ++--
.../MemberTerminatedEventProcessor.java | 10 ++--
.../topology/ServiceCreatedEventProcessor.java | 10 ++--
.../topology/ServiceRemovedEventProcessor.java | 11 +++--
.../topology/TopologyEventProcessor.java | 37 +++++++++++++++
.../topology/TopologyMessageProcessor.java | 43 -----------------
15 files changed, 233 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventListener.java
new file mode 100644
index 0000000..e495bf1
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventListener.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Observable;
+import java.util.Observer;
+
+/**
+ * Event listener definition.
+ */
+public abstract class EventListener implements Observer {
+ private static final Log log = LogFactory.getLog(EventListener.class);
+
+ @Override
+ public void update(Observable o, Object arg) {
+ if(arg instanceof Event) {
+ Event event = (Event) arg;
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Event received: %s", event.getClass().getName()));
+ }
+ eventReceived(event);
+ }
+ }
+
+ /**
+ * Triggered when an event is received.
+ * @param event
+ */
+ protected abstract void eventReceived(Event event);
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
new file mode 100644
index 0000000..497f8eb
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event;
+
+import java.util.Observable;
+
+/**
+ * Event observable definition.
+ */
+public abstract class EventObservable extends Observable {
+
+ public void addEventListener(EventListener eventListener) {
+ addObserver(eventListener);
+ }
+
+ public void removeEventListener(EventListener eventListener) {
+ deleteObserver(eventListener);
+ }
+
+ public void notifyEventListeners(Event event) {
+ setChanged();
+ notifyObservers(event);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java
new file mode 100644
index 0000000..7024cc5
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor;
+
+import org.apache.stratos.messaging.domain.topology.Topology;
+
+/**
+ * Message processor interface. Every Message Processor should implement this.
+ */
+public interface MessageProcessor {
+
+ /**
+ * Link a message processor and its successor, if there's any.
+ * @param nextProcessor
+ */
+ public abstract void setNext(MessageProcessor nextProcessor);
+
+ /**
+ * Message processing and delegating logic.
+ * @param type type of the message.
+ * @param message real message body.
+ * @param object Object that will get updated.
+ * @return whether the processing was successful or not.
+ */
+ public abstract boolean process(String type, String message, Object object);
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
index 450d65b..401bebd 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
@@ -24,20 +24,21 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class ClusterCreatedEventProcessor implements TopologyMessageProcessor {
+public class ClusterCreatedEventProcessor extends TopologyEventProcessor {
private static final Log log = LogFactory.getLog(ClusterCreatedEventProcessor.class);
- private TopologyMessageProcessor nextMsgProcessor;
+ private MessageProcessor nextMsgProcessor;
@Override
- public void setNext(TopologyMessageProcessor nextProcessor) {
+ public void setNext(MessageProcessor nextProcessor) {
nextMsgProcessor = nextProcessor;
}
@Override
- public boolean process(String type, String message, Topology topology) {
+ public boolean processEvent(String type, String message, Topology topology) {
if (ClusterCreatedEvent.class.getName().equals(type)) {
// Parse complete message and build event
ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
@@ -64,6 +65,7 @@ public class ClusterCreatedEventProcessor implements TopologyMessageProcessor {
event.getServiceName(), event.getClusterId()));
}
+ notifyEventListeners(event);
return true;
} else {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
index 26160e7..8e97478 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
@@ -23,20 +23,21 @@ import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class ClusterRemovedEventProcessor implements TopologyMessageProcessor {
+public class ClusterRemovedEventProcessor extends TopologyEventProcessor {
private static final Log log = LogFactory.getLog(ClusterRemovedEventProcessor.class);
- private TopologyMessageProcessor nextMsgProcessor;
+ private MessageProcessor nextMsgProcessor;
@Override
- public void setNext(TopologyMessageProcessor nextProcessor) {
+ public void setNext(MessageProcessor nextProcessor) {
nextMsgProcessor = nextProcessor;
}
@Override
- public boolean process(String type, String message, Topology topology) {
+ public boolean processEvent(String type, String message, Topology topology) {
if (ClusterRemovedEvent.class.getName().equals(type)) {
// Parse complete message and build event
ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
@@ -60,6 +61,7 @@ public class ClusterRemovedEventProcessor implements TopologyMessageProcessor {
event.getServiceName(), event.getClusterId()));
}
+ notifyEventListeners(event);
return true;
} else {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
index 80f6e02..a14b571 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
@@ -22,20 +22,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class CompleteTopologyEventProcessor implements TopologyMessageProcessor {
+public class CompleteTopologyEventProcessor extends TopologyEventProcessor {
private static final Log log = LogFactory.getLog(CompleteTopologyEventProcessor.class);
- private TopologyMessageProcessor nextMsgProcessor;
+ private MessageProcessor nextMsgProcessor;
@Override
- public void setNext(TopologyMessageProcessor nextProcessor) {
+ public void setNext(MessageProcessor nextProcessor) {
nextMsgProcessor = nextProcessor;
}
@Override
- public boolean process(String type, String message, Topology topology) {
+ public boolean processEvent(String type, String message, Topology topology) {
if (CompleteTopologyEvent.class.getName().equals(type)) {
// Parse complete message and build event
CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
@@ -43,6 +44,8 @@ public class CompleteTopologyEventProcessor implements TopologyMessageProcessor
if (log.isInfoEnabled()) {
log.info("Topology initialized");
}
+
+ notifyEventListeners(event);
return true;
} else {
if (nextMsgProcessor != null) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
index c0de675..3d3d410 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
@@ -22,20 +22,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.topology.*;
import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class InstanceSpawnedEventProcessor implements TopologyMessageProcessor {
+public class InstanceSpawnedEventProcessor extends TopologyEventProcessor {
private static final Log log = LogFactory.getLog(InstanceSpawnedEventProcessor.class);
- private TopologyMessageProcessor nextMsgProcessor;
+ private MessageProcessor nextMsgProcessor;
@Override
- public void setNext(TopologyMessageProcessor nextProcessor) {
+ public void setNext(MessageProcessor nextProcessor) {
nextMsgProcessor = nextProcessor;
}
@Override
- public boolean process(String type, String message, Topology topology) {
+ public boolean processEvent(String type, String message, Topology topology) {
if (MemberStartedEvent.class.getName().equals(type)) {
// Parse complete message and build event
MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
@@ -70,6 +71,7 @@ public class InstanceSpawnedEventProcessor implements TopologyMessageProcessor {
event.getMemberId()));
}
+ notifyEventListeners(event);
return true;
} else {
if (nextMsgProcessor != null) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
index 700f992..0cf39a0 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
@@ -26,20 +26,21 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class MemberActivatedEventProcessor implements TopologyMessageProcessor {
+public class MemberActivatedEventProcessor extends TopologyEventProcessor {
private static final Log log = LogFactory.getLog(MemberActivatedEventProcessor.class);
- private TopologyMessageProcessor nextMsgProcessor;
+ private MessageProcessor nextMsgProcessor;
@Override
- public void setNext(TopologyMessageProcessor nextProcessor) {
+ public void setNext(MessageProcessor nextProcessor) {
nextMsgProcessor = nextProcessor;
}
@Override
- public boolean process(String type, String message, Topology topology) {
+ public boolean processEvent(String type, String message, Topology topology) {
if (MemberActivatedEvent.class.getName().equals(type)) {
// Parse complete message and build event
MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class);
@@ -92,6 +93,7 @@ public class MemberActivatedEventProcessor implements TopologyMessageProcessor {
event.getMemberId()));
}
+ notifyEventListeners(event);
return true;
} else {
if (nextMsgProcessor != null) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
index 373d713..7d4f482 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
@@ -26,20 +26,21 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class MemberStartedEventProcessor implements TopologyMessageProcessor {
+public class MemberStartedEventProcessor extends TopologyEventProcessor {
private static final Log log = LogFactory.getLog(MemberStartedEventProcessor.class);
- private TopologyMessageProcessor nextMsgProcessor;
+ private MessageProcessor nextMsgProcessor;
@Override
- public void setNext(TopologyMessageProcessor nextProcessor) {
+ public void setNext(MessageProcessor nextProcessor) {
nextMsgProcessor = nextProcessor;
}
@Override
- public boolean process(String type, String message, Topology topology) {
+ public boolean processEvent(String type, String message, Topology topology) {
try {
if (MemberStartedEvent.class.getName().equals(type)) {
// Parse complete message and build event
@@ -80,6 +81,7 @@ public class MemberStartedEventProcessor implements TopologyMessageProcessor {
event.getMemberId()));
}
+ notifyEventListeners(event);
return true;
} else {
if (nextMsgProcessor != null) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
index 32ffec7..df0eb9b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
@@ -26,20 +26,21 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class MemberSuspendedEventProcessor implements TopologyMessageProcessor {
+public class MemberSuspendedEventProcessor extends TopologyEventProcessor {
private static final Log log = LogFactory.getLog(MemberSuspendedEventProcessor.class);
- private TopologyMessageProcessor nextMsgProcessor;
+ private MessageProcessor nextMsgProcessor;
@Override
- public void setNext(TopologyMessageProcessor nextProcessor) {
+ public void setNext(MessageProcessor nextProcessor) {
nextMsgProcessor = nextProcessor;
}
@Override
- public boolean process(String type, String message, Topology topology) {
+ public boolean processEvent(String type, String message, Topology topology) {
if (MemberSuspendedEvent.class.getName().equals(type)) {
// Parse complete message and build event
MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(message, MemberSuspendedEvent.class);
@@ -79,6 +80,7 @@ public class MemberSuspendedEventProcessor implements TopologyMessageProcessor {
event.getMemberId()));
}
+ notifyEventListeners(event);
return true;
} else {
if (nextMsgProcessor != null) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
index bd8c235..7c3b0b5 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
@@ -26,20 +26,21 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class MemberTerminatedEventProcessor implements TopologyMessageProcessor {
+public class MemberTerminatedEventProcessor extends TopologyEventProcessor {
private static final Log log = LogFactory.getLog(MemberTerminatedEventProcessor.class);
- private TopologyMessageProcessor nextMsgProcessor;
+ private MessageProcessor nextMsgProcessor;
@Override
- public void setNext(TopologyMessageProcessor nextProcessor) {
+ public void setNext(MessageProcessor nextProcessor) {
nextMsgProcessor = nextProcessor;
}
@Override
- public boolean process(String type, String message, Topology topology) {
+ public boolean processEvent(String type, String message, Topology topology) {
try {
if (MemberTerminatedEvent.class.getName().equals(type)) {
// Parse complete message and build event
@@ -79,6 +80,7 @@ public class MemberTerminatedEventProcessor implements TopologyMessageProcessor
event.getMemberId()));
}
+ notifyEventListeners(event);
return true;
} else {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
index 94b95d5..f4cbcc2 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
@@ -23,20 +23,21 @@ import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class ServiceCreatedEventProcessor implements TopologyMessageProcessor {
+public class ServiceCreatedEventProcessor extends TopologyEventProcessor {
private static final Log log = LogFactory.getLog(ServiceCreatedEventProcessor.class);
- private TopologyMessageProcessor nextMsgProcessor;
+ private MessageProcessor nextMsgProcessor;
@Override
- public void setNext(TopologyMessageProcessor nextProcessor) {
+ public void setNext(MessageProcessor nextProcessor) {
nextMsgProcessor = nextProcessor;
}
@Override
- public boolean process(String type, String message, Topology topology) {
+ public boolean processEvent(String type, String message, Topology topology) {
if (ServiceCreatedEvent.class.getName().equals(type)) {
// Parse complete message and build event
ServiceCreatedEvent event = (ServiceCreatedEvent) Util.jsonToObject(message, ServiceCreatedEvent.class);
@@ -54,6 +55,7 @@ public class ServiceCreatedEventProcessor implements TopologyMessageProcessor {
log.info(String.format("Service created: [service] %s", event.getServiceName()));
}
+ notifyEventListeners(event);
return true;
} else {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
index cdf307c..80c26b8 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
@@ -23,20 +23,21 @@ import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class ServiceRemovedEventProcessor implements TopologyMessageProcessor {
+public class ServiceRemovedEventProcessor extends TopologyEventProcessor {
private static final Log log = LogFactory.getLog(ServiceRemovedEventProcessor.class);
- private TopologyMessageProcessor nextMsgProcessor;
+ private MessageProcessor nextMsgProcessor;
@Override
- public void setNext(TopologyMessageProcessor nextProcessor) {
+ public void setNext(MessageProcessor nextProcessor) {
nextMsgProcessor = nextProcessor;
}
@Override
- public boolean process(String type, String message, Topology topology) {
+ public boolean processEvent(String type, String message, Topology topology) {
if (ServiceRemovedEvent.class.getName().equals(type)) {
// Parse complete message and build event
ServiceRemovedEvent event = (ServiceRemovedEvent) Util.jsonToObject(message, ServiceRemovedEvent.class);
@@ -54,6 +55,8 @@ public class ServiceRemovedEventProcessor implements TopologyMessageProcessor {
if (log.isInfoEnabled()) {
log.info(String.format("Service removed: [service] %s", event.getServiceName()));
}
+
+ notifyEventListeners(event);
return true;
} else {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessor.java
new file mode 100644
index 0000000..506dc4d
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.EventObservable;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+
+/**
+ *
+ */
+public abstract class TopologyEventProcessor extends EventObservable implements MessageProcessor {
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ return processEvent(type, message, (Topology) object);
+ }
+
+ protected abstract boolean processEvent(String type, String message, Topology object);
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d5b796ee/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessor.java
deleted file mode 100644
index f4b4da6..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.stratos.messaging.domain.topology.Topology;
-
-/**
- * Message processor interface. Every Message Processor should implement this.
- */
-public interface TopologyMessageProcessor {
-
- /**
- * Link a message processor and its successor, if there's any.
- * @param nextProcessor
- */
- public abstract void setNext(TopologyMessageProcessor nextProcessor);
-
- /**
- * Message processing and delegating logic.
- * @param type type of the message.
- * @param message real message body.
- * @param topology Topology that will get updated.
- * @return whether the processing was successful or not.
- */
- public abstract boolean process(String type, String message, Topology topology);
-}