You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ch...@apache.org on 2011/02/20 06:56:25 UTC

svn commit: r1072507 - in /synapse/trunk/java/modules/core/src/main/java/org/apache/synapse: config/ config/xml/ mediators/store/

Author: charith
Date: Sun Feb 20 05:56:25 2011
New Revision: 1072507

URL: http://svn.apache.org/viewvc?rev=1072507&view=rev
Log:
making Message processor as a top level element

Added:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java
Modified:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java?rev=1072507&r1=1072506&r2=1072507&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java Sun Feb 20 05:56:25 2011
@@ -24,6 +24,7 @@ import org.apache.axis2.engine.AxisConfi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.*;
+import org.apache.synapse.message.processors.MessageProcessor;
 import org.apache.synapse.message.store.MessageStore;
 import org.apache.synapse.deployers.SynapseArtifactDeploymentStore;
 import org.apache.synapse.commons.datasource.DataSourceRepositoryHolder;
@@ -131,6 +132,11 @@ public class SynapseConfiguration implem
     private Map<String, MessageStore> messageStores = new ConcurrentHashMap<String, MessageStore>();
 
     /**
+     * Message processors in the synapse configuration
+     */
+    private Map<String , MessageProcessor> messageProcessors = new ConcurrentHashMap<String ,MessageProcessor>();
+
+    /**
      * Description/documentation of the configuration
      */
     private String description = null;
@@ -1139,7 +1145,12 @@ public class SynapseConfiguration implem
 
         //initialize message stores
         for(MessageStore messageStore : messageStores.values()) {
-            ((ManagedLifecycle) messageStore).init(se);
+            messageStore.init(se);
+        }
+
+        // initialize message processors
+        for(MessageProcessor messageProcessor : messageProcessors.values()) {
+            messageProcessor.init(se);
         }
     }
 
@@ -1287,7 +1298,7 @@ public class SynapseConfiguration implem
         if (!messageStores.containsKey(name)){
             messageStores.put(name,messageStore);
         } else {
-            handleException("Duplicate message store by the name: " + name);
+            handleException("Duplicate message store : " + name);
         }
     }
 
@@ -1310,6 +1321,36 @@ public class SynapseConfiguration implem
     }
 
     /**
+     * Add message processor to the synapse configuration with given name
+     * @param name of the Message processor
+     * @param processor instance
+     */
+    public void addMessageProcessor(String name , MessageProcessor processor) {
+        if(!(messageProcessors.containsKey(processor))) {
+            messageProcessors.put(name , processor);
+        } else {
+            handleException("Duplicate Message Processor " + name);
+        }
+    }
+
+    /**
+     * Get all Message processors in the Synapse configuration
+     * @return Return Map that contains all the message processors
+     */
+    public Map<String, MessageProcessor> getMessageProcessors() {
+        return messageProcessors;
+    }
+
+    /**
+     * remove the message processor from the synapse configuration
+     * @param name  of the message
+     * @return  Removed Message processor instance
+     */
+    public MessageProcessor removeMessageProcessor(String name) {
+        return messageProcessors.remove(name);
+    }
+
+    /**
      * Sets the description of the configuration
      *
      * @param description tobe set to the artifact

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java?rev=1072507&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java Sun Feb 20 05:56:25 2011
@@ -0,0 +1,134 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.config.xml;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.message.processors.MessageProcessor;
+
+import javax.xml.namespace.QName;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+
+/**
+ * Create an instance of the given Message processor, and sets properties on it.
+ * <p/>
+ * &lt;messageProcessor name="string" class="classname" messageStore = "string" &gt;
+ * &lt;parameter name="string"&gt"string" &lt;parameter&gt;
+ * &lt;parameter name="string"&gt"string" &lt;parameter&gt;
+ * &lt;parameter name="string"&gt"string" &lt;parameter&gt;
+ * .
+ * .
+ * &lt;/messageProcessor&gt;
+ */
+public class MessageProcessorFactory {
+    private static final Log log = LogFactory.getLog(MessageProcessorFactory.class);
+    public static final QName CLASS_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "class");
+    public static final QName NAME_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "name");
+    public static final QName PARAMETER_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE,
+            "parameter");
+    public static final QName MESSAGE_STORE_Q = new QName(XMLConfigConstants.NULL_NAMESPACE ,
+            "messageStore");
+    private static final QName DESCRIPTION_Q
+            = new QName(SynapseConstants.SYNAPSE_NAMESPACE, "description");
+
+
+    /**
+     * Creates a Message processor instance from given xml configuration element
+     * @param elem OMElement of that contain the Message processor configuration
+     * @return  created message processor instance
+     */
+    public static MessageProcessor createMessageProcessor(OMElement elem) {
+        MessageProcessor processor = null;
+        OMAttribute clssAtt = elem.getAttribute(CLASS_Q);
+
+        if(clssAtt != null) {
+            try {
+                Class cls = Class.forName(clssAtt.getAttributeValue());
+                processor = (MessageProcessor) cls.newInstance();
+            } catch (Exception e) {
+                handleException("Error while creating Message processor " + e.getMessage());
+            }
+        } else {
+            /**We throw Exception since there is not default processor*/
+            handleException("Can't create Message processor without a provider class");
+        }
+
+        OMAttribute nameAtt = elem.getAttribute(NAME_Q);
+        if (nameAtt != null) {
+            assert processor != null;
+            processor.setName(nameAtt.getAttributeValue());
+        } else {
+            handleException("Can't create Message processor without a name ");
+        }
+
+        OMAttribute storeAtt = elem.getAttribute(MESSAGE_STORE_Q);
+
+        if(storeAtt != null) {
+            assert processor != null;
+            processor.setMessageStoreName(storeAtt.getAttributeValue());
+        } else {
+            handleException("Can't create message processor with out a message processor");
+        }
+
+        OMElement descriptionElem = elem.getFirstChildWithName(DESCRIPTION_Q);
+        if (descriptionElem != null) {
+            assert processor != null;
+            processor.setDescription(descriptionElem.getText());
+        }
+
+        assert processor != null;
+        processor.setParameters(getParameters(elem));
+
+        return processor;
+    }
+
+    private static Map<String, Object> getParameters(OMElement elem) {
+        Iterator params = elem.getChildrenWithName(PARAMETER_Q);
+        Map<String, Object> parameters = new HashMap<String, Object>();
+
+        while (params.hasNext()) {
+            Object o = params.next();
+            if (o instanceof OMElement) {
+                OMElement prop = (OMElement) o;
+                OMAttribute paramName = prop.getAttribute(NAME_Q);
+                String paramValue = prop.getText();
+                if (paramName != null) {
+                    if (paramValue != null) {
+                        parameters.put(paramName.getAttributeValue(), paramValue);
+                    }
+                } else {
+                    handleException("Invalid MessageStore parameter - Parameter must have a name ");
+                }
+            }
+        }
+        return parameters;
+    }
+
+    private static void handleException(String msg) {
+        log.error(msg);
+        throw new SynapseException(msg);
+    }
+}

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java?rev=1072507&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java Sun Feb 20 05:56:25 2011
@@ -0,0 +1,123 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.config.xml;
+
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.message.processors.MessageProcessor;
+
+import javax.xml.namespace.QName;
+import java.util.Iterator;
+
+
+/**
+ * Create an instance of the given Message processor, and sets properties on it.
+ * <p/>
+ * &lt;messageProcessor name="string" class="classname" messageStore = "string" &gt;
+ * &lt;parameter name="string"&gt"string" &lt;parameter&gt;
+ * &lt;parameter name="string"&gt"string" &lt;parameter&gt;
+ * &lt;parameter name="string"&gt"string" &lt;parameter&gt;
+ * .
+ * .
+ * &lt;/messageProcessor&gt;
+ */
+public class MessageProcessorSerializer {
+
+    private static final Log log = LogFactory.getLog(MessageProcessorSerializer.class);
+
+    protected static final OMFactory fac = OMAbstractFactory.getOMFactory();
+    protected static final OMNamespace synNS = SynapseConstants.SYNAPSE_OMNAMESPACE;
+    protected static final OMNamespace nullNS = fac.createOMNamespace(
+            XMLConfigConstants.NULL_NAMESPACE, "");
+
+
+    /**
+     * Serialize a give Message processor instance to XML configuration
+     * @param parent parent configuration
+     * @param processor message processor instance
+     * @return  created XML configuration
+     */
+    public static OMElement serializeMessageProcessor(OMElement parent, MessageProcessor processor) {
+        OMElement processorElem = fac.createOMElement("messageProcessor", synNS);
+        if (processor != null) {
+            processorElem.addAttribute(fac.createOMAttribute("class", nullNS,
+                    processor.getClass().getName()));
+        } else {
+            handleException("Invalid processor. Provider is required");
+        }
+
+        if (processor.getName() != null) {
+            processorElem.addAttribute(fac.createOMAttribute("name", nullNS, processor.getName()));
+        } else {
+            handleException("Message store Name not specified");
+        }
+
+        if(processor.getMessageStoreName() != null) {
+            processorElem.addAttribute(fac.createOMAttribute(
+                    "messageStore",nullNS,processor.getMessageStoreName()));
+        }
+
+        if (processor.getParameters() != null) {
+            Iterator iterator = processor.getParameters().keySet().iterator();
+            while (iterator.hasNext()) {
+                String name = (String) iterator.next();
+                String value = (String) processor.getParameters().get(name);
+                OMElement property = fac.createOMElement("parameter", synNS);
+                property.addAttribute(fac.createOMAttribute(
+                        "name", nullNS, name));
+                property.setText(value.trim());
+                processorElem.addChild(property);
+            }
+        }
+
+
+        if (getSerializedDescription(processor) != null) {
+            processorElem.addChild(getSerializedDescription(processor));
+        }
+
+        if (parent != null) {
+            parent.addChild(processorElem);
+        }
+        return processorElem;
+    }
+
+    private static void handleException(String msg) {
+        log.error(msg);
+        throw new SynapseException(msg);
+    }
+
+    private static OMElement getSerializedDescription(MessageProcessor processor) {
+        OMElement descriptionElem = fac.createOMElement(
+                new QName(SynapseConstants.SYNAPSE_NAMESPACE, "description"));
+
+        if (processor.getDescription() != null) {
+            descriptionElem.setText(processor.getDescription());
+            return descriptionElem;
+        } else {
+            return null;
+        }
+    }
+
+}

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java?rev=1072507&r1=1072506&r2=1072507&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java Sun Feb 20 05:56:25 2011
@@ -42,8 +42,6 @@ import java.util.Properties;
  * Create an instance of the given Message Store, and sets properties on it.
  * <p/>
  * &lt;messageStore name="string" class="classname" [sequence = "string" ]&gt;
- * &lt;<processor class="classname">&gt;
- * &lt;</processor>&gt;
  * &lt;parameter name="string"&gt"string" &lt;parameter&gt;
  * &lt;parameter name="string"&gt"string" &lt;parameter&gt;
  * &lt;parameter name="string"&gt"string" &lt;parameter&gt;
@@ -59,7 +57,6 @@ public class MessageStoreFactory {
     public static final QName NAME_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "name");
     public static final QName SEQUENCE_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "sequence");
 
-    public static final QName PROCESSOR_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "processor");
     public static final QName PARAMETER_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE,
             "parameter");
     private static final QName DESCRIPTION_Q
@@ -104,22 +101,7 @@ public class MessageStoreFactory {
 
         messageStore.setParameters(getParameters(elem));
 
-        OMElement processorElm = elem.getFirstChildWithName(PROCESSOR_Q);
-        MessageProcessor processor = null;
-        if (processorElm != null) {
-            processor = populateMessageProcessor(processorElm);
 
-            processor.setMessageStore(messageStore);
-        } else {
-            log.warn("Creating a Message Store without a Message processor ");
-        }
-
-        if(processor != null) {
-            messageStore.setMessageProcessor(processor);
-            processor.setMessageStore(messageStore);
-        } else {
-            log.warn("Message Store Created with out a Message processor. ");
-        }
 
         log.info("Successfully created Message Store" + nameAtt.getAttributeValue());
         return messageStore;
@@ -148,53 +130,6 @@ public class MessageStoreFactory {
         return parameters;
     }
 
-    /**
-     * Populate the Message Processor
-     *
-     * @return
-     */
-    private static MessageProcessor populateMessageProcessor(OMElement element) {
-        OMAttribute classAtt = element.getAttribute(CLASS_Q);
-        MessageProcessor processor = null;
-        if (classAtt != null) {
-            String className = classAtt.getAttributeValue();
-            try {
-                Class cls = Class.forName(className);
-                if (cls != null) {
-                    processor = (MessageProcessor) cls.newInstance();
-                    Iterator params = element.getChildrenWithName(PARAMETER_Q);
-                    Map<String, Object> parameters = new HashMap<String, Object>();
-
-                    while (params.hasNext()) {
-                        Object o = params.next();
-                        if (o instanceof OMElement) {
-                            OMElement prop = (OMElement) o;
-                            OMAttribute paramName = prop.getAttribute(NAME_Q);
-                            String paramValue = prop.getText();
-                            if (paramName != null) {
-                                if (paramValue != null) {
-                                    parameters.put(paramName.getAttributeValue(), paramValue);
-                                }
-                            } else {
-                                handleException("Invalid Message Processor parameter - Parameter must have a name ");
-                            }
-                        }
-                    }
-                    processor.setParameters(parameters);
-                } else {
-                    throw new SynapseException("Can't find Class " + className);
-                }
-            } catch (Exception e) {
-                log.error("Error while Creating MessageProcessor " + e.getMessage());
-                throw new SynapseException(e);
-            }
-        } else {
-            log.warn("Creating Message Store with out a Message processor processor");
-        }
-
-        return processor;
-    }
-
     private static void handleException(String msg) {
         log.error(msg);
         throw new SynapseException(msg);

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java?rev=1072507&r1=1072506&r2=1072507&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java Sun Feb 20 05:56:25 2011
@@ -27,13 +27,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.SynapseException;
-import org.apache.synapse.message.processors.MessageProcessor;
-import org.apache.synapse.message.store.MessageStore;
 import org.apache.synapse.message.store.InMemoryMessageStore;
+import org.apache.synapse.message.store.MessageStore;
 
 import javax.xml.namespace.QName;
 import java.util.Iterator;
-import java.util.Map;
 
 /**
  * Serialize an instance of the given Message Store, and sets properties on it.
@@ -57,10 +55,10 @@ public class MessageStoreSerializer {
 
         OMElement store = fac.createOMElement("messageStore", synNS);
 
-        if (messageStore.getProviderClass() != null) {
-            if (!messageStore.getProviderClass().equals(InMemoryMessageStore.class.getName())) {
+        if (messageStore != null) {
+            if (!messageStore.getClass().getName().equals(InMemoryMessageStore.class.getName())) {
                 store.addAttribute(fac.createOMAttribute("class", nullNS,
-                        messageStore.getProviderClass()));
+                        messageStore.getClass().getName()));
             }
         } else {
             handleException("Invalid MessageStore. Provider is required");
@@ -89,9 +87,7 @@ public class MessageStoreSerializer {
             }
         }
 
-        if(messageStore.getMessageProcessor() != null) {
-            populateProcessorElem(store,fac,messageStore);
-        }
+
         if (getSerializedDescription(messageStore) != null) {
             store.addChild(getSerializedDescription(messageStore));
         }
@@ -114,23 +110,6 @@ public class MessageStoreSerializer {
         }
     }
 
-    private static void populateProcessorElem(OMElement storeElem ,OMFactory factory,
-                                              MessageStore messageStore) {
-        MessageProcessor processor = messageStore.getMessageProcessor();
-        OMElement processorElem = factory.createOMElement("processor" , synNS);
-        Map<String ,Object> parameters = processor.getParameters();
-
-        Iterator<Map.Entry<String ,Object>> it = parameters.entrySet().iterator();
-        while (it.hasNext()) {
-            Map.Entry<String,Object> entry = it.next();
-            OMElement paramElem = factory.createOMElement("parameter", synNS);
-            paramElem.addAttribute(fac.createOMAttribute("name", nullNS, entry.getKey()));
-            paramElem.setText((String)entry.getValue());
-            processorElem.addChild(paramElem);
-        }
-        storeElem.addChild(processorElem);
-    }
-
     private static void handleException(String msg) {
         log.error(msg);
         throw new SynapseException(msg);

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java?rev=1072507&r1=1072506&r2=1072507&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java Sun Feb 20 05:56:25 2011
@@ -26,6 +26,7 @@ import org.apache.synapse.Mediator;
 import org.apache.synapse.Startup;
 import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.SynapseException;
+import org.apache.synapse.message.processors.MessageProcessor;
 import org.apache.synapse.message.store.MessageStore;
 import org.apache.synapse.commons.executors.PriorityExecutor;
 import org.apache.synapse.commons.executors.config.PriorityExecutorFactory;
@@ -90,7 +91,9 @@ public class SynapseXMLConfigurationFact
                     defineExecutor(config, elt, properties);
                 } else if(XMLConfigConstants.MESSAGE_STORE_ELT.equals(elt.getQName())) {
                     defineMessageStore(config, elt, properties);
-                } else if (StartupFinder.getInstance().isStartup(elt.getQName())) {
+                } else if (XMLConfigConstants.MESSAGE_PROCESSOR_ELT.equals(elt.getQName())){
+                    defineMessageProcessor(config,elt,properties);
+                }else if (StartupFinder.getInstance().isStartup(elt.getQName())) {
                     defineStartup(config, elt, properties);
                 } else if (XMLConfigConstants.DESCRIPTION_ELT.equals(elt.getQName())) {
                     config.setDescription(elt.getText());
@@ -288,6 +291,13 @@ public class SynapseXMLConfigurationFact
         return messageStore;
     }
 
+    public static MessageProcessor defineMessageProcessor(SynapseConfiguration config,
+                                                          OMElement elem, Properties properties) {
+        MessageProcessor processor  = MessageProcessorFactory.createMessageProcessor(elem);
+        config.addMessageProcessor(processor.getName() , processor);
+        return processor;
+    }
+
     private static void handleException(String msg) {
         log.error(msg);
         throw new SynapseException(msg);

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java?rev=1072507&r1=1072506&r2=1072507&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java Sun Feb 20 05:56:25 2011
@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.synapse.Startup;
 import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.SynapseException;
+import org.apache.synapse.message.processors.MessageProcessor;
 import org.apache.synapse.message.store.MessageStore;
 import org.apache.synapse.commons.executors.PriorityExecutor;
 import org.apache.synapse.commons.executors.config.PriorityExecutorSerializer;
@@ -131,6 +132,8 @@ public class SynapseXMLConfigurationSeri
 
        // Message stores
         serializeMessageStores(definitions, synCfg.getMessageStores());        
+       //Message Processors
+        serializeMessageProcessors(definitions,synCfg.getMessageProcessors());
 
         return definitions;
     }
@@ -181,6 +184,13 @@ public class SynapseXMLConfigurationSeri
         }        
     }
 
+    private static void serializeMessageProcessors(OMElement definitions,
+                                               Map<String, MessageProcessor> processorMap ){
+        for (MessageProcessor mp : processorMap.values()) {
+            MessageProcessorSerializer.serializeMessageProcessor(definitions,mp);
+        }
+    }
+
     private static void handleException(String msg) {
         log.error(msg);
         throw new SynapseException(msg);

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java?rev=1072507&r1=1072506&r2=1072507&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java Sun Feb 20 05:56:25 2011
@@ -69,6 +69,8 @@ public class XMLConfigConstants {
     public static final QName PROXY_ELT       = new QName(SYNAPSE_NAMESPACE, "proxy");
     public static final QName EVENT_SOURCE_ELT = new QName(SYNAPSE_NAMESPACE, "eventSource");
     public static final QName MESSAGE_STORE_ELT = new QName(SYNAPSE_NAMESPACE, "messageStore");
+    public static final QName MESSAGE_PROCESSOR_ELT = new QName(SYNAPSE_NAMESPACE ,
+                                                                                "messageProcessor");
     public static final String NULL_NAMESPACE = "";
     public static final Object QUARTZ_QNAME   =
         new QName("http://www.opensymphony.com/quartz/JobSchedulingData", "quartz");

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java?rev=1072507&r1=1072506&r2=1072507&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java Sun Feb 20 05:56:25 2011
@@ -54,7 +54,7 @@ public class MessageStoreMediator extend
                         sequence.mediate(synCtx);
                     }
                 }
-                messageStore.store(synCtx);
+                messageStore.offer(synCtx);
 
                 // with the nio transport, this causes the listener not to write a 202
                 // Accepted response, as this implies that Synapse does not yet know if