You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/11/05 13:45:00 UTC

git commit: Generalized message delegator functionality by introducing a message processor chain model

Updated Branches:
  refs/heads/master 5dabed467 -> 5029186d1


Generalized message delegator functionality by introducing a message processor chain model


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/5029186d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/5029186d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/5029186d

Branch: refs/heads/master
Commit: 5029186d1f3b73a8445781915373f8ebb3a3c864
Parents: 5dabed4
Author: Imesh Gunaratne <im...@apache.org>
Authored: Tue Nov 5 18:14:49 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Tue Nov 5 18:14:49 2013 +0530

----------------------------------------------------------------------
 .../processor/MessageProcessorChain.java        | 60 ++++++++++++++++++++
 .../topology/TopologyMessageProcessorChain.java | 47 +++++++++++++++
 .../topology/TopologyEventMessageDelegator.java | 57 +++++++++----------
 3 files changed, 135 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/5029186d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessorChain.java
new file mode 100644
index 0000000..ba3ccae
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/MessageProcessorChain.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor;
+
+import java.util.LinkedList;
+
+/**
+ * Message processor chain definition.
+ */
+public class MessageProcessorChain {
+
+    private LinkedList<MessageProcessor> list;
+
+    public MessageProcessorChain() {
+        list = new LinkedList<MessageProcessor>();
+        initialize();
+    }
+
+    protected void initialize() {
+    }
+
+    public void add(MessageProcessor messageProcessor) {
+        if(list.size() > 0) {
+            list.getLast().setNext(messageProcessor);
+        }
+        list.add(messageProcessor);
+    }
+
+    public void removeLast() {
+        list.removeLast();
+        if(list.size() > 0) {
+            list.getLast().setNext(null);
+        }
+    }
+
+    public boolean process(String type, String message, Object object) {
+        MessageProcessor root = list.getFirst();
+        if(root == null) {
+            throw new RuntimeException("Message processor chain is not initialized");
+        }
+        return root.process(type, message, object);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/5029186d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
new file mode 100644
index 0000000..0281930
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+
+/**
+ * Defines default topology message processor chain.
+ */
+public class TopologyMessageProcessorChain extends MessageProcessorChain {
+    private static final Log log = LogFactory.getLog(TopologyMessageProcessorChain.class);
+
+    public void initialize() {
+        // Add topology event processors
+        add(new ServiceCreatedEventProcessor());
+        add(new ServiceRemovedEventProcessor());
+        add(new ClusterCreatedEventProcessor());
+        add(new ClusterRemovedEventProcessor());
+        add(new InstanceSpawnedEventProcessor());
+        add(new MemberStartedEventProcessor());
+        add(new MemberActivatedEventProcessor());
+        add(new MemberSuspendedEventProcessor());
+        add(new MemberTerminatedEventProcessor());
+        if(log.isDebugEnabled()) {
+            log.debug("Topology message processor chain initialized");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/5029186d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 3e05cfc..9b82b27 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -22,17 +22,39 @@ import javax.jms.TextMessage;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.EventListener;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
 import org.apache.stratos.messaging.message.processor.topology.*;
 import org.apache.stratos.messaging.util.Constants;
 
 
 /**
- * Implements the default event processor chain for updating
- * the topology data structure in topology manager.
+ * Implements logic for processing topology event messages based on a given
+ * topology process chain.
+ *
+ * Functionality:
+ * - Wait for the complete topology event.
+ * - Process messages using the given message processor chain.
  */
 public class TopologyEventMessageDelegator implements Runnable {
 
     private static final Log log = LogFactory.getLog(TopologyEventMessageDelegator.class);
+    private CompleteTopologyEventProcessor completeTopologyEventProcessor;
+    private MessageProcessorChain processorChain;
+
+    public TopologyEventMessageDelegator() {
+        this.completeTopologyEventProcessor = new CompleteTopologyEventProcessor();
+        this.processorChain = new TopologyMessageProcessorChain();
+    }
+
+    public TopologyEventMessageDelegator(MessageProcessorChain processorChain) {
+        this.completeTopologyEventProcessor = new CompleteTopologyEventProcessor();
+        this.processorChain = processorChain;
+    }
+
+    public void addCompleteTopologyEventListener(EventListener eventListener) {
+        completeTopologyEventProcessor.addEventListener(eventListener);
+    }
 
     @Override
     public void run() {
@@ -45,14 +67,12 @@ public class TopologyEventMessageDelegator implements Runnable {
                 try {
                     // First take the complete topology event
                     TextMessage message = TopologyEventQueue.getInstance().take();
-
-                    // retrieve the header
+                    // Retrieve the header
                     String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
-                    // retrieve the actual message
+                    // Retrieve the actual message
                     String json = message.getText();
 
-                    CompleteTopologyEventProcessor processor = new CompleteTopologyEventProcessor();
-                    if (processor.process(type, json, TopologyManager.getTopology())) {
+                    if (completeTopologyEventProcessor.process(type, json, TopologyManager.getTopology())) {
                         break;
                     }
 
@@ -61,27 +81,6 @@ public class TopologyEventMessageDelegator implements Runnable {
                 }
             }
 
-            // Instantiate all topology event processors
-            ServiceCreatedEventProcessor processor1 = new ServiceCreatedEventProcessor();
-            ServiceRemovedEventProcessor processor2 = new ServiceRemovedEventProcessor();
-            ClusterCreatedEventProcessor processor3 = new ClusterCreatedEventProcessor();
-            ClusterRemovedEventProcessor processor4 = new ClusterRemovedEventProcessor();
-            InstanceSpawnedEventProcessor processor5 = new InstanceSpawnedEventProcessor();
-            MemberStartedEventProcessor processor6 = new MemberStartedEventProcessor();
-            MemberActivatedEventProcessor processor7 = new MemberActivatedEventProcessor();
-            MemberSuspendedEventProcessor processor8 = new MemberSuspendedEventProcessor();
-            MemberTerminatedEventProcessor processor9 = new MemberTerminatedEventProcessor();
-
-            // Link above processors in the required order
-            processor1.setNext(processor2);
-            processor2.setNext(processor3);
-            processor3.setNext(processor4);
-            processor4.setNext(processor5);
-            processor5.setNext(processor6);
-            processor6.setNext(processor7);
-            processor7.setNext(processor8);
-            processor8.setNext(processor9);
-
             while (true) {
                 try {
                     TextMessage message = TopologyEventQueue.getInstance().take();
@@ -97,7 +96,7 @@ public class TopologyEventMessageDelegator implements Runnable {
 
                     try {
                         TopologyManager.acquireWriteLock();
-                        processor1.process(type, json, TopologyManager.getTopology());
+                        processorChain.process(type, json, TopologyManager.getTopology());
                     } finally {
                         TopologyManager.releaseWriteLock();
                     }