You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/11/05 13:45:00 UTC
git commit: Generalized message delegator functionality by
introducing a message processor chain model
Updated Branches:
refs/heads/master 5dabed467 -> 5029186d1
Generalized message delegator functionality by introducing a message processor chain model
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/5029186d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/5029186d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/5029186d
Branch: refs/heads/master
Commit: 5029186d1f3b73a8445781915373f8ebb3a3c864
Parents: 5dabed4
Author: Imesh Gunaratne <im...@apache.org>
Authored: Tue Nov 5 18:14:49 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Tue Nov 5 18:14:49 2013 +0530
----------------------------------------------------------------------
.../processor/MessageProcessorChain.java | 60 ++++++++++++++++++++
.../topology/TopologyMessageProcessorChain.java | 47 +++++++++++++++
.../topology/TopologyEventMessageDelegator.java | 57 +++++++++----------
3 files changed, 135 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/5029186d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessorChain.java
new file mode 100644
index 0000000..ba3ccae
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessorChain.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor;
+
+import java.util.LinkedList;
+
+/**
+ * Message processor chain definition.
+ */
+public class MessageProcessorChain {
+
+ private LinkedList<MessageProcessor> list;
+
+ public MessageProcessorChain() {
+ list = new LinkedList<MessageProcessor>();
+ initialize();
+ }
+
+ protected void initialize() {
+ }
+
+ public void add(MessageProcessor messageProcessor) {
+ if(list.size() > 0) {
+ list.getLast().setNext(messageProcessor);
+ }
+ list.add(messageProcessor);
+ }
+
+ public void removeLast() {
+ list.removeLast();
+ if(list.size() > 0) {
+ list.getLast().setNext(null);
+ }
+ }
+
+ public boolean process(String type, String message, Object object) {
+ MessageProcessor root = list.getFirst();
+ if(root == null) {
+ throw new RuntimeException("Message processor chain is not initialized");
+ }
+ return root.process(type, message, object);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/5029186d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
new file mode 100644
index 0000000..0281930
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+
+/**
+ * Defines default topology message processor chain.
+ */
+public class TopologyMessageProcessorChain extends MessageProcessorChain {
+ private static final Log log = LogFactory.getLog(TopologyMessageProcessorChain.class);
+
+ public void initialize() {
+ // Add topology event processors
+ add(new ServiceCreatedEventProcessor());
+ add(new ServiceRemovedEventProcessor());
+ add(new ClusterCreatedEventProcessor());
+ add(new ClusterRemovedEventProcessor());
+ add(new InstanceSpawnedEventProcessor());
+ add(new MemberStartedEventProcessor());
+ add(new MemberActivatedEventProcessor());
+ add(new MemberSuspendedEventProcessor());
+ add(new MemberTerminatedEventProcessor());
+ if(log.isDebugEnabled()) {
+ log.debug("Topology message processor chain initialized");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/5029186d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 3e05cfc..9b82b27 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -22,17 +22,39 @@ import javax.jms.TextMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.EventListener;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
import org.apache.stratos.messaging.message.processor.topology.*;
import org.apache.stratos.messaging.util.Constants;
/**
- * Implements the default event processor chain for updating
- * the topology data structure in topology manager.
+ * Implements logic for processing topology event messages based on a given
+ * topology process chain.
+ *
+ * Functionality:
+ * - Wait for the complete topology event.
+ * - Process messages using the given message processor chain.
*/
public class TopologyEventMessageDelegator implements Runnable {
private static final Log log = LogFactory.getLog(TopologyEventMessageDelegator.class);
+ private CompleteTopologyEventProcessor completeTopologyEventProcessor;
+ private MessageProcessorChain processorChain;
+
+ public TopologyEventMessageDelegator() {
+ this.completeTopologyEventProcessor = new CompleteTopologyEventProcessor();
+ this.processorChain = new TopologyMessageProcessorChain();
+ }
+
+ public TopologyEventMessageDelegator(MessageProcessorChain processorChain) {
+ this.completeTopologyEventProcessor = new CompleteTopologyEventProcessor();
+ this.processorChain = processorChain;
+ }
+
+ public void addCompleteTopologyEventListener(EventListener eventListener) {
+ completeTopologyEventProcessor.addEventListener(eventListener);
+ }
@Override
public void run() {
@@ -45,14 +67,12 @@ public class TopologyEventMessageDelegator implements Runnable {
try {
// First take the complete topology event
TextMessage message = TopologyEventQueue.getInstance().take();
-
- // retrieve the header
+ // Retrieve the header
String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
- // retrieve the actual message
+ // Retrieve the actual message
String json = message.getText();
- CompleteTopologyEventProcessor processor = new CompleteTopologyEventProcessor();
- if (processor.process(type, json, TopologyManager.getTopology())) {
+ if (completeTopologyEventProcessor.process(type, json, TopologyManager.getTopology())) {
break;
}
@@ -61,27 +81,6 @@ public class TopologyEventMessageDelegator implements Runnable {
}
}
- // Instantiate all topology event processors
- ServiceCreatedEventProcessor processor1 = new ServiceCreatedEventProcessor();
- ServiceRemovedEventProcessor processor2 = new ServiceRemovedEventProcessor();
- ClusterCreatedEventProcessor processor3 = new ClusterCreatedEventProcessor();
- ClusterRemovedEventProcessor processor4 = new ClusterRemovedEventProcessor();
- InstanceSpawnedEventProcessor processor5 = new InstanceSpawnedEventProcessor();
- MemberStartedEventProcessor processor6 = new MemberStartedEventProcessor();
- MemberActivatedEventProcessor processor7 = new MemberActivatedEventProcessor();
- MemberSuspendedEventProcessor processor8 = new MemberSuspendedEventProcessor();
- MemberTerminatedEventProcessor processor9 = new MemberTerminatedEventProcessor();
-
- // Link above processors in the required order
- processor1.setNext(processor2);
- processor2.setNext(processor3);
- processor3.setNext(processor4);
- processor4.setNext(processor5);
- processor5.setNext(processor6);
- processor6.setNext(processor7);
- processor7.setNext(processor8);
- processor8.setNext(processor9);
-
while (true) {
try {
TextMessage message = TopologyEventQueue.getInstance().take();
@@ -97,7 +96,7 @@ public class TopologyEventMessageDelegator implements Runnable {
try {
TopologyManager.acquireWriteLock();
- processor1.process(type, json, TopologyManager.getTopology());
+ processorChain.process(type, json, TopologyManager.getTopology());
} finally {
TopologyManager.releaseWriteLock();
}