You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/11/05 19:26:58 UTC
git commit: Removed TopologyEventProcessor and replaced its usage
with MessageProcessor,
Renamed TopologyMessageProcessorChain to TopologyEventProcessorChain,
Fixed exception handling and log messages
Updated Branches:
refs/heads/master a2e0659b4 -> a103b79a8
Removed TopologyEventProcessor and replaced its usage with MessageProcessor, Renamed TopologyMessageProcessorChain to TopologyEventProcessorChain, Fixed exception handling and log messages
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/a103b79a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/a103b79a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/a103b79a
Branch: refs/heads/master
Commit: a103b79a8a135e39a0517a3d7dff110bc851b390
Parents: a2e0659
Author: Imesh Gunaratne <im...@wso2.com>
Authored: Tue Nov 5 23:56:48 2013 +0530
Committer: Imesh Gunaratne <im...@wso2.com>
Committed: Tue Nov 5 23:56:48 2013 +0530
----------------------------------------------------------------------
.../message/processor/MessageProcessor.java | 5 +-
.../topology/ClusterCreatedEventProcessor.java | 16 ++-
.../topology/ClusterRemovedEventProcessor.java | 15 ++-
.../CompleteTopologyEventProcessor.java | 15 ++-
.../topology/InstanceSpawnedEventProcessor.java | 15 ++-
.../topology/MemberActivatedEventProcessor.java | 15 ++-
.../topology/MemberStartedEventProcessor.java | 122 +++++++++----------
.../topology/MemberSuspendedEventProcessor.java | 14 ++-
.../MemberTerminatedEventProcessor.java | 120 +++++++++---------
.../topology/ServiceCreatedEventProcessor.java | 14 ++-
.../topology/ServiceRemovedEventProcessor.java | 15 ++-
.../topology/TopologyEventProcessor.java | 37 ------
.../topology/TopologyEventProcessorChain.java | 47 +++++++
.../topology/TopologyMessageProcessorChain.java | 47 -------
.../topology/TopologyEventMessageDelegator.java | 12 +-
.../org/apache/stratos/messaging/util/Util.java | 5 +-
16 files changed, 245 insertions(+), 269 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java
index 7024cc5..b8e973d 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessor.java
@@ -20,11 +20,12 @@
package org.apache.stratos.messaging.message.processor;
import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.EventObservable;
/**
- * Message processor interface. Every Message Processor should implement this.
+ * Message processor definition.
*/
-public interface MessageProcessor {
+public abstract class MessageProcessor extends EventObservable {
/**
* Link a message processor and its successor, if there's any.
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
index 401bebd..b311e27 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
@@ -27,21 +27,24 @@ import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class ClusterCreatedEventProcessor extends TopologyEventProcessor {
+public class ClusterCreatedEventProcessor extends MessageProcessor {
private static final Log log = LogFactory.getLog(ClusterCreatedEventProcessor.class);
- private MessageProcessor nextMsgProcessor;
+ private MessageProcessor nextProcessor;
@Override
public void setNext(MessageProcessor nextProcessor) {
- nextMsgProcessor = nextProcessor;
+ this.nextProcessor = nextProcessor;
}
@Override
- public boolean processEvent(String type, String message, Topology topology) {
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology)object;
+
if (ClusterCreatedEvent.class.getName().equals(type)) {
// Parse complete message and build event
ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
+
// Validate event against the existing topology
Service service = topology.getService(event.getServiceName());
if (service == null) {
@@ -65,13 +68,14 @@ public class ClusterCreatedEventProcessor extends TopologyEventProcessor {
event.getServiceName(), event.getClusterId()));
}
+ // Notify event listeners
notifyEventListeners(event);
return true;
} else {
- if (nextMsgProcessor != null) {
+ if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, topology);
} else {
throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
index 8e97478..689c062 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
@@ -26,18 +26,20 @@ import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class ClusterRemovedEventProcessor extends TopologyEventProcessor {
+public class ClusterRemovedEventProcessor extends MessageProcessor {
private static final Log log = LogFactory.getLog(ClusterRemovedEventProcessor.class);
- private MessageProcessor nextMsgProcessor;
+ private MessageProcessor nextProcessor;
@Override
public void setNext(MessageProcessor nextProcessor) {
- nextMsgProcessor = nextProcessor;
+ this.nextProcessor = nextProcessor;
}
@Override
- public boolean processEvent(String type, String message, Topology topology) {
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology)object;
+
if (ClusterRemovedEvent.class.getName().equals(type)) {
// Parse complete message and build event
ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
@@ -61,13 +63,14 @@ public class ClusterRemovedEventProcessor extends TopologyEventProcessor {
event.getServiceName(), event.getClusterId()));
}
+ // Notify event listeners
notifyEventListeners(event);
return true;
} else {
- if (nextMsgProcessor != null) {
+ if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, topology);
} else {
throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
index a14b571..488a3eb 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
@@ -25,18 +25,20 @@ import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class CompleteTopologyEventProcessor extends TopologyEventProcessor {
+public class CompleteTopologyEventProcessor extends MessageProcessor {
private static final Log log = LogFactory.getLog(CompleteTopologyEventProcessor.class);
- private MessageProcessor nextMsgProcessor;
+ private MessageProcessor nextProcessor;
@Override
public void setNext(MessageProcessor nextProcessor) {
- nextMsgProcessor = nextProcessor;
+ this.nextProcessor = nextProcessor;
}
@Override
- public boolean processEvent(String type, String message, Topology topology) {
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology)object;
+
if (CompleteTopologyEvent.class.getName().equals(type)) {
// Parse complete message and build event
CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
@@ -45,12 +47,13 @@ public class CompleteTopologyEventProcessor extends TopologyEventProcessor {
log.info("Topology initialized");
}
+ // Notify event listeners
notifyEventListeners(event);
return true;
} else {
- if (nextMsgProcessor != null) {
+ if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, topology);
}
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
index 3d3d410..5576d75 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
@@ -25,18 +25,20 @@ import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class InstanceSpawnedEventProcessor extends TopologyEventProcessor {
+public class InstanceSpawnedEventProcessor extends MessageProcessor {
private static final Log log = LogFactory.getLog(InstanceSpawnedEventProcessor.class);
- private MessageProcessor nextMsgProcessor;
+ private MessageProcessor nextProcessor;
@Override
public void setNext(MessageProcessor nextProcessor) {
- nextMsgProcessor = nextProcessor;
+ this.nextProcessor = nextProcessor;
}
@Override
- public boolean processEvent(String type, String message, Topology topology) {
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology)object;
+
if (MemberStartedEvent.class.getName().equals(type)) {
// Parse complete message and build event
MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
@@ -71,12 +73,13 @@ public class InstanceSpawnedEventProcessor extends TopologyEventProcessor {
event.getMemberId()));
}
+ // Notify event listeners
notifyEventListeners(event);
return true;
} else {
- if (nextMsgProcessor != null) {
+ if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, topology);
} else {
throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
index 0cf39a0..563b2ab 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
@@ -29,18 +29,20 @@ import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class MemberActivatedEventProcessor extends TopologyEventProcessor {
+public class MemberActivatedEventProcessor extends MessageProcessor {
private static final Log log = LogFactory.getLog(MemberActivatedEventProcessor.class);
- private MessageProcessor nextMsgProcessor;
+ private MessageProcessor nextProcessor;
@Override
public void setNext(MessageProcessor nextProcessor) {
- nextMsgProcessor = nextProcessor;
+ this.nextProcessor = nextProcessor;
}
@Override
- public boolean processEvent(String type, String message, Topology topology) {
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology)object;
+
if (MemberActivatedEvent.class.getName().equals(type)) {
// Parse complete message and build event
MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class);
@@ -93,12 +95,13 @@ public class MemberActivatedEventProcessor extends TopologyEventProcessor {
event.getMemberId()));
}
+ // Notify event listeners
notifyEventListeners(event);
return true;
} else {
- if (nextMsgProcessor != null) {
+ if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, topology);
} else {
throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
index 7d4f482..9745c32 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
@@ -29,75 +29,69 @@ import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class MemberStartedEventProcessor extends TopologyEventProcessor {
+public class MemberStartedEventProcessor extends MessageProcessor {
- private static final Log log = LogFactory.getLog(MemberStartedEventProcessor.class);
- private MessageProcessor nextMsgProcessor;
+ private static final Log log = LogFactory.getLog(MemberStartedEventProcessor.class);
+ private MessageProcessor nextProcessor;
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- nextMsgProcessor = nextProcessor;
- }
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
- @Override
- public boolean processEvent(String type, String message, Topology topology) {
- try {
- if (MemberStartedEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- throw new RuntimeException(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
- }
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
- }
- Member member = cluster.getMember(event.getMemberId());
- if (member == null) {
- throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- if (member.getStatus() == MemberStatus.Starting) {
- throw new RuntimeException(String.format("Member already started: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
+ if (MemberStartedEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
- // Apply changes to the topology
- member.setStatus(MemberStatus.Starting);
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ throw new RuntimeException(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if (member == null) {
+ throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ if (member.getStatus() == MemberStatus.Starting) {
+ throw new RuntimeException(String.format("Member already started: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
- if (log.isInfoEnabled()) {
- log.info(String.format("Member started: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
+ // Apply changes to the topology
+ member.setStatus(MemberStatus.Starting);
- notifyEventListeners(event);
- return true;
- } else {
- if (nextMsgProcessor != null) {
- // ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message, topology);
- }
- }
- } catch (Exception e) {
- if (nextMsgProcessor != null) {
- // ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process the message: %s of type %s using any of the available processors.",
- message, type));
- }
- }
- return false;
- }
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member started: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
index df0eb9b..a4a5ef6 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
@@ -29,18 +29,20 @@ import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class MemberSuspendedEventProcessor extends TopologyEventProcessor {
+public class MemberSuspendedEventProcessor extends MessageProcessor {
private static final Log log = LogFactory.getLog(MemberSuspendedEventProcessor.class);
- private MessageProcessor nextMsgProcessor;
+ private MessageProcessor nextProcessor;
@Override
public void setNext(MessageProcessor nextProcessor) {
- nextMsgProcessor = nextProcessor;
+ this.nextProcessor = nextProcessor;
}
@Override
- public boolean processEvent(String type, String message, Topology topology) {
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
if (MemberSuspendedEvent.class.getName().equals(type)) {
// Parse complete message and build event
MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(message, MemberSuspendedEvent.class);
@@ -83,9 +85,9 @@ public class MemberSuspendedEventProcessor extends TopologyEventProcessor {
notifyEventListeners(event);
return true;
} else {
- if (nextMsgProcessor != null) {
+ if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, topology);
} else {
throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
index 7c3b0b5..87862fc 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
@@ -29,76 +29,68 @@ import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class MemberTerminatedEventProcessor extends TopologyEventProcessor {
+public class MemberTerminatedEventProcessor extends MessageProcessor {
- private static final Log log = LogFactory.getLog(MemberTerminatedEventProcessor.class);
- private MessageProcessor nextMsgProcessor;
+ private static final Log log = LogFactory.getLog(MemberTerminatedEventProcessor.class);
+ private MessageProcessor nextProcessor;
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- nextMsgProcessor = nextProcessor;
- }
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
- @Override
- public boolean processEvent(String type, String message, Topology topology) {
- try {
- if (MemberTerminatedEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(message, MemberTerminatedEvent.class);
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- throw new RuntimeException(String.format("Service does not exist: [service] %s", event.getServiceName()));
- }
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
- }
- Member member = cluster.getMember(event.getMemberId());
- if (member == null) {
- throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- if (member.getStatus() == MemberStatus.Terminated) {
- throw new RuntimeException(String.format("Member already terminated: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
+ if (MemberTerminatedEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(message, MemberTerminatedEvent.class);
- // Apply changes to the topology
- member.setStatus(MemberStatus.Terminated);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ throw new RuntimeException(String.format("Service does not exist: [service] %s", event.getServiceName()));
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if (member == null) {
+ throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ if (member.getStatus() == MemberStatus.Terminated) {
+ throw new RuntimeException(String.format("Member already terminated: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
- notifyEventListeners(event);
- return true;
+ // Apply changes to the topology
+ member.setStatus(MemberStatus.Terminated);
- } else {
- if (nextMsgProcessor != null) {
- // ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message, topology);
- }
- }
- } catch (Exception e) {
- if (nextMsgProcessor != null) {
- // ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process the message: %s of type %s using any of the available processors.",
- message, type));
- }
- }
- return false;
- }
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ notifyEventListeners(event);
+ return true;
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
index f4cbcc2..911c006 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
@@ -26,18 +26,20 @@ import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class ServiceCreatedEventProcessor extends TopologyEventProcessor {
+public class ServiceCreatedEventProcessor extends MessageProcessor {
private static final Log log = LogFactory.getLog(ServiceCreatedEventProcessor.class);
- private MessageProcessor nextMsgProcessor;
+ private MessageProcessor nextProcessor;
@Override
public void setNext(MessageProcessor nextProcessor) {
- nextMsgProcessor = nextProcessor;
+ this.nextProcessor = nextProcessor;
}
@Override
- public boolean processEvent(String type, String message, Topology topology) {
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology)object;
+
if (ServiceCreatedEvent.class.getName().equals(type)) {
// Parse complete message and build event
ServiceCreatedEvent event = (ServiceCreatedEvent) Util.jsonToObject(message, ServiceCreatedEvent.class);
@@ -59,9 +61,9 @@ public class ServiceCreatedEventProcessor extends TopologyEventProcessor {
return true;
} else {
- if (nextMsgProcessor != null) {
+ if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, topology);
} else {
throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
index 80c26b8..289d318 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
@@ -26,18 +26,20 @@ import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.util.Util;
-public class ServiceRemovedEventProcessor extends TopologyEventProcessor {
+public class ServiceRemovedEventProcessor extends MessageProcessor {
private static final Log log = LogFactory.getLog(ServiceRemovedEventProcessor.class);
- private MessageProcessor nextMsgProcessor;
+ private MessageProcessor nextProcessor;
@Override
public void setNext(MessageProcessor nextProcessor) {
- nextMsgProcessor = nextProcessor;
+ this.nextProcessor = nextProcessor;
}
@Override
- public boolean processEvent(String type, String message, Topology topology) {
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology)object;
+
if (ServiceRemovedEvent.class.getName().equals(type)) {
// Parse complete message and build event
ServiceRemovedEvent event = (ServiceRemovedEvent) Util.jsonToObject(message, ServiceRemovedEvent.class);
@@ -56,13 +58,14 @@ public class ServiceRemovedEventProcessor extends TopologyEventProcessor {
log.info(String.format("Service removed: [service] %s", event.getServiceName()));
}
+ // Notify event listeners
notifyEventListeners(event);
return true;
} else {
- if (nextMsgProcessor != null) {
+ if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, topology);
} else {
throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessor.java
deleted file mode 100644
index 506dc4d..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessor.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.EventObservable;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-
-/**
- *
- */
-public abstract class TopologyEventProcessor extends EventObservable implements MessageProcessor {
-
- @Override
- public boolean process(String type, String message, Object object) {
- return processEvent(type, message, (Topology) object);
- }
-
- protected abstract boolean processEvent(String type, String message, Topology object);
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessorChain.java
new file mode 100644
index 0000000..917f7c9
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessorChain.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+
+/**
+ * Defines default topology message processor chain.
+ */
+public class TopologyEventProcessorChain extends MessageProcessorChain {
+ private static final Log log = LogFactory.getLog(TopologyEventProcessorChain.class);
+
+ public void initialize() {
+ // Add topology event processors
+ add(new ServiceCreatedEventProcessor());
+ add(new ServiceRemovedEventProcessor());
+ add(new ClusterCreatedEventProcessor());
+ add(new ClusterRemovedEventProcessor());
+ add(new InstanceSpawnedEventProcessor());
+ add(new MemberStartedEventProcessor());
+ add(new MemberActivatedEventProcessor());
+ add(new MemberSuspendedEventProcessor());
+ add(new MemberTerminatedEventProcessor());
+ if(log.isDebugEnabled()) {
+ log.debug("Topology message processor chain initialized");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
deleted file mode 100644
index 0281930..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
-
-/**
- * Defines default topology message processor chain.
- */
-public class TopologyMessageProcessorChain extends MessageProcessorChain {
- private static final Log log = LogFactory.getLog(TopologyMessageProcessorChain.class);
-
- public void initialize() {
- // Add topology event processors
- add(new ServiceCreatedEventProcessor());
- add(new ServiceRemovedEventProcessor());
- add(new ClusterCreatedEventProcessor());
- add(new ClusterRemovedEventProcessor());
- add(new InstanceSpawnedEventProcessor());
- add(new MemberStartedEventProcessor());
- add(new MemberActivatedEventProcessor());
- add(new MemberSuspendedEventProcessor());
- add(new MemberTerminatedEventProcessor());
- if(log.isDebugEnabled()) {
- log.debug("Topology message processor chain initialized");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 9b82b27..6986a28 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -39,21 +39,21 @@ import org.apache.stratos.messaging.util.Constants;
public class TopologyEventMessageDelegator implements Runnable {
private static final Log log = LogFactory.getLog(TopologyEventMessageDelegator.class);
- private CompleteTopologyEventProcessor completeTopologyEventProcessor;
+ private CompleteTopologyEventProcessor completeTopEvMsgProcessor;
private MessageProcessorChain processorChain;
public TopologyEventMessageDelegator() {
- this.completeTopologyEventProcessor = new CompleteTopologyEventProcessor();
- this.processorChain = new TopologyMessageProcessorChain();
+ this.completeTopEvMsgProcessor = new CompleteTopologyEventProcessor();
+ this.processorChain = new TopologyEventProcessorChain();
}
public TopologyEventMessageDelegator(MessageProcessorChain processorChain) {
- this.completeTopologyEventProcessor = new CompleteTopologyEventProcessor();
+ this.completeTopEvMsgProcessor = new CompleteTopologyEventProcessor();
this.processorChain = processorChain;
}
public void addCompleteTopologyEventListener(EventListener eventListener) {
- completeTopologyEventProcessor.addEventListener(eventListener);
+ completeTopEvMsgProcessor.addEventListener(eventListener);
}
@Override
@@ -72,7 +72,7 @@ public class TopologyEventMessageDelegator implements Runnable {
// Retrieve the actual message
String json = message.getText();
- if (completeTopologyEventProcessor.process(type, json, TopologyManager.getTopology())) {
+ if (completeTopEvMsgProcessor.process(type, json, TopologyManager.getTopology())) {
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a103b79a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
index fa2b27b..eb0035e 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
@@ -44,7 +44,9 @@ public class Util {
log.error("Failed to load properties from file: " + filePath, e);
} finally {
try {
- is.close();
+ if(is != null) {
+ is.close();
+ }
} catch (IOException ignore) {
}
}
@@ -88,6 +90,7 @@ public class Util {
return true;
}
catch (NumberFormatException e) {
+ // Not a valid number
}
return false;
}