You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ch...@apache.org on 2011/02/11 07:16:59 UTC

svn commit: r1069682 - in /synapse/trunk/java/modules/core/src: main/java/org/apache/synapse/ main/java/org/apache/synapse/config/xml/ main/java/org/apache/synapse/endpoints/ main/java/org/apache/synapse/mediators/store/ main/java/org/apache/synapse/me...

Author: charith
Date: Fri Feb 11 06:16:58 2011
New Revision: 1069682

URL: http://svn.apache.org/viewvc?rev=1069682&view=rev
Log:
refactoring synaspe message store

Added:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreMediatorFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelViewMBean.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
Removed:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/RedeliveryProcessor.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/StorableMessage.java
Modified:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreView.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreViewMBean.java
    synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageStoreSerializationTest.java

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java Fri Feb 11 06:16:58 2011
@@ -402,13 +402,6 @@ public final class SynapseConstants {
     public static final int CALLOUT_OPERATION_FAILED    = 401000;
 
 
-    /**Message Context properties used for Message Store*/
-
-    // Number of attempts that has been tried to redeliver the message
-    public static final String MESSAGE_STORE_REDELIVERY_COUNT = "message.store.redelivery.count";
-
-    // Does message as attemted all its redelivered
-    public static final String MESSAGE_STORE_REDELIVERED = "message.store.redelivery.redelivered";
 
     // Fail-safe mode properties
     public static final String FAIL_SAFE_MODE_STATUS = "failsafe.mode.enable";

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java Fri Feb 11 06:16:58 2011
@@ -79,7 +79,8 @@ public class MediatorFactoryFinder imple
         ConditionalRouterMediatorFactory.class,
         SamplingThrottleMediatorFactory.class,
         URLRewriteMediatorFactory.class,
-        EnrichMediatorFactory.class    
+        EnrichMediatorFactory.class,
+        MessageStoreMediatorFactory.class
     };
 
     private final static MediatorFactoryFinder instance  = new MediatorFactoryFinder();

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java Fri Feb 11 06:16:58 2011
@@ -25,12 +25,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.SynapseException;
 import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.message.processors.MessageProcessor;
 import org.apache.synapse.message.store.InMemoryMessageStore;
 import org.apache.synapse.message.store.MessageStore;
-import org.apache.synapse.message.store.RedeliveryProcessor;
 import org.apache.axis2.util.JavaUtils;
 
 
+import javax.xml.XMLConstants;
 import javax.xml.namespace.QName;
 import java.util.Iterator;
 import java.util.Map;
@@ -40,11 +41,14 @@ import java.util.Properties;
 /**
  * Create an instance of the given Message Store, and sets properties on it.
  * <p/>
- * &lt;messageStore name="string" class="classname" [sequence = "string" ] &gt;
- * &lt;redelivery&gt;
- * &lt;interval&gt;delay in seconds &lt;/interval&gt;
- * &lt;maximumRedeliveries&gt;maximum_number_of_redeliveries_to attempt  &lt;/maximumRedeliveries&gt;
- * &lt;/redelivery&gt;
+ * &lt;messageStore name="string" class="classname" [sequence = "string" ]&gt;
+ * &lt;<processor class="classname">&gt;
+ * &lt;</processor>&gt;
+ * &lt;parameter name="string"&gt"string" &lt;parameter&gt;
+ * &lt;parameter name="string"&gt"string" &lt;parameter&gt;
+ * &lt;parameter name="string"&gt"string" &lt;parameter&gt;
+ * .
+ * .
  * &lt;/messageStore&gt;
  */
 public class MessageStoreFactory {
@@ -55,20 +59,13 @@ public class MessageStoreFactory {
     public static final QName NAME_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "name");
     public static final QName SEQUENCE_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "sequence");
 
-    private static final QName REDELIVERY_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE,
-            "redelivery");
-    private static final QName DELAY_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "interval");
-    private static final QName MAX_REDELIVERIES = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE,
-            "maximumRedeliveries");
-    private static final QName ENABLE_EXPONENTIAL_BACKOFF = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE,
-            "exponentialBackoff");
-    private static final QName BACKOFF_MULTIPLIER = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE,
-            "backoffMutiplier");
+    public static final QName PROCESSOR_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "processor");
     public static final QName PARAMETER_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE,
             "parameter");
     private static final QName DESCRIPTION_Q
             = new QName(SynapseConstants.SYNAPSE_NAMESPACE, "description");
 
+
     @SuppressWarnings({"UnusedDeclaration"})
     public static MessageStore createMessageStore(OMElement elem, Properties properties) {
 
@@ -86,7 +83,9 @@ public class MessageStoreFactory {
             messageStore = new InMemoryMessageStore();
         }
 
+
         OMAttribute nameAtt = elem.getAttribute(NAME_Q);
+
         if (nameAtt != null) {
             messageStore.setName(nameAtt.getAttributeValue());
         } else {
@@ -94,62 +93,35 @@ public class MessageStoreFactory {
         }
 
         OMAttribute sequenceAtt = elem.getAttribute(SEQUENCE_Q);
-        if(sequenceAtt != null) {
+        if (sequenceAtt != null) {
             messageStore.setSequence(sequenceAtt.getAttributeValue());
         }
 
-        OMElement redeliveryElem = elem.getFirstChildWithName(REDELIVERY_Q);
-
-        if (redeliveryElem != null) {
-            RedeliveryProcessor redeliveryProcessor = populateRedeliveryProcessor(redeliveryElem,
-                    messageStore);
-            messageStore.setRedeliveryProcessor(redeliveryProcessor);
-        }
-
         OMElement descriptionElem = elem.getFirstChildWithName(DESCRIPTION_Q);
         if (descriptionElem != null) {
             messageStore.setDescription(descriptionElem.getText());
         }
 
         messageStore.setParameters(getParameters(elem));
-        return messageStore;
-    }
 
-    private static RedeliveryProcessor populateRedeliveryProcessor(OMElement element,
-                                                                   MessageStore messageStore) {
-
-        RedeliveryProcessor redeliveryProcessor = new RedeliveryProcessor(messageStore);
-
-        OMElement intervalElm = element.getFirstChildWithName(DELAY_Q);
-        if (intervalElm != null) {
-            int delay = 1000 * Integer.parseInt(intervalElm.getText());
-            redeliveryProcessor.setRedeliveryDelay(delay);
-        }
-
-        OMElement maxRedeliveryElm = element.getFirstChildWithName(MAX_REDELIVERIES);
-
-        if (maxRedeliveryElm != null) {
-            int maxRedeliveries = Integer.parseInt(maxRedeliveryElm.getText());
-            redeliveryProcessor.setMaxRedeleveries(maxRedeliveries);
+        OMElement processorElm = elem.getFirstChildWithName(PROCESSOR_Q);
+        MessageProcessor processor = null;
+        if (processorElm != null) {
+            processor = populateMessageProcessor(processorElm);
+        } else {
+            log.warn("Creating a Message Store without ");
         }
 
-        OMElement expBOElm = element.getFirstChildWithName(ENABLE_EXPONENTIAL_BACKOFF);
-
-        if (expBOElm != null) {
-            if (JavaUtils.isTrueExplicitly(expBOElm.getText())) {
-                redeliveryProcessor.setExponentialBackoff(true);
-                OMElement multiplierElm = element.getFirstChildWithName(BACKOFF_MULTIPLIER);
-                if (multiplierElm != null) {
-                    int mulp = Integer.parseInt(multiplierElm.getText());
-                    redeliveryProcessor.setBackOffMultiplier(mulp);
-                }
-            }
+        if(processor != null) {
+            messageStore.setMessageProcessor(processor);
+        } else {
+            log.warn("Message Store Created with out a Message processor. ");
         }
-
-        return redeliveryProcessor;
+        return messageStore;
     }
 
-    private static Map<String,Object> getParameters(OMElement elem) {
+
+    private static Map<String, Object> getParameters(OMElement elem) {
         Iterator params = elem.getChildrenWithName(PARAMETER_Q);
         Map<String, Object> parameters = new HashMap<String, Object>();
 
@@ -168,7 +140,54 @@ public class MessageStoreFactory {
                 }
             }
         }
-        return parameters ;
+        return parameters;
+    }
+
+    /**
+     * Populate the Message Processor
+     *
+     * @return
+     */
+    private static MessageProcessor populateMessageProcessor(OMElement element) {
+        OMAttribute classAtt = element.getAttribute(CLASS_Q);
+        MessageProcessor processor = null;
+        if (classAtt != null) {
+            String className = classAtt.getAttributeValue();
+            try {
+                Class cls = Class.forName(className);
+                if (cls != null) {
+                    processor = (MessageProcessor) cls.newInstance();
+                    Iterator params = element.getChildrenWithName(PARAMETER_Q);
+                    Map<String, Object> parameters = new HashMap<String, Object>();
+
+                    while (params.hasNext()) {
+                        Object o = params.next();
+                        if (o instanceof OMElement) {
+                            OMElement prop = (OMElement) o;
+                            OMAttribute paramName = prop.getAttribute(NAME_Q);
+                            String paramValue = prop.getText();
+                            if (paramName != null) {
+                                if (paramValue != null) {
+                                    parameters.put(paramName.getAttributeValue(), paramValue);
+                                }
+                            } else {
+                                handleException("Invalid Message Processor parameter - Parameter must have a name ");
+                            }
+                        }
+                    }
+                    processor.setParameters(parameters);
+                } else {
+                    throw new SynapseException("Can't find Class " + className);
+                }
+            } catch (Exception e) {
+                log.error("Error while Creating MessageProcessor " + e.getMessage());
+                throw new SynapseException(e);
+            }
+        } else {
+            log.warn("Creating Message Store with out a Message processor processor");
+        }
+
+        return processor;
     }
 
     private static void handleException(String msg) {

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreMediatorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreMediatorFactory.java?rev=1069682&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreMediatorFactory.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreMediatorFactory.java Fri Feb 11 06:16:58 2011
@@ -0,0 +1,72 @@
+/*
+*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+*  WSO2 Inc. licenses this file to you under the Apache License,
+*  Version 2.0 (the "License"); you may not use this file except
+*  in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.config.xml;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.mediators.store.MessageStoreMediator;
+
+import javax.xml.namespace.QName;
+import java.util.Properties;
+
+/**
+ * Creates an instance of a MessageStore mediator using XML configuration specified
+ *
+ * <pre>
+ * &lt;store messageStore = "message store name" [sequence = "sequence name"] /&gt;
+ * </pre>
+ *
+ * TODO Message store mediator will be improved with more user options
+ */
+public class MessageStoreMediatorFactory extends AbstractMediatorFactory{
+
+    private static final QName STORE_Q    = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "store");
+    private static final QName ATT_MESSAGE_STORE   = new QName("messageStore");
+    private static final QName ATT_SEQUENCE   = new QName("sequence");
+
+    @Override
+    protected Mediator createSpecificMediator(OMElement elem, Properties properties) {
+        MessageStoreMediator messageStoreMediator = new MessageStoreMediator();
+        OMAttribute nameAtt = elem.getAttribute(ATT_NAME);
+        if(nameAtt != null) {
+            messageStoreMediator.setName(nameAtt.getAttributeValue());
+        }
+
+        OMAttribute messageStoreNameAtt = elem.getAttribute(ATT_MESSAGE_STORE);
+
+        if(messageStoreNameAtt != null) {
+            messageStoreMediator.setMessageStoreName(messageStoreNameAtt.getAttributeValue());
+        } else {
+            throw new SynapseException("Message Store mediator must have a Message store defined");
+        }
+
+        OMAttribute sequenceAtt = elem.getAttribute(ATT_SEQUENCE);
+
+        if(sequenceAtt != null) {
+            messageStoreMediator.setOnStoreSequence(sequenceAtt.getAttributeValue());
+        }
+
+        return messageStoreMediator;
+    }
+
+    public QName getTagQName() {
+       return STORE_Q;
+    }
+}

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java Fri Feb 11 06:16:58 2011
@@ -27,20 +27,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.SynapseException;
+import org.apache.synapse.message.processors.MessageProcessor;
 import org.apache.synapse.message.store.MessageStore;
 import org.apache.synapse.message.store.InMemoryMessageStore;
 
 import javax.xml.namespace.QName;
 import java.util.Iterator;
+import java.util.Map;
 
 /**
  * Serialize an instance of the given Message Store, and sets properties on it.
  * <p/>
  * &lt;messageStore name="string" class="classname" [sequence = "string" ] &gt;
- * &lt;redelivery&gt;
- * &lt;interval&gt;delay in seconds &lt;/interval&gt;
- * &lt;maximumRedeliveries&gt;maximum_number_of_redeliveries_to attempt  &lt;/maximumRedeliveries&gt;
- * &lt;/redelivery&gt;
+ * &lt;parameter name="string"&gt"string" &lt;parameter&gt;
+ * &lt;parameter name="string"&gt"string" &lt;parameter&gt;
+ * &lt;parameter name="string"&gt"string" &lt;parameter&gt;
  * &lt;/messageStore&gt;
  */
 public class MessageStoreSerializer {
@@ -75,36 +76,6 @@ public class MessageStoreSerializer {
             handleException("Message store Name not specified");
         }
 
-        //Redelivery processor
-        OMElement redilevery = fac.createOMElement("redelivery", synNS);
-        int reDeliveryDelay = messageStore.getRedeliveryProcessor().getRedeliveryDelay() / 1000;
-
-        OMElement delay = fac.createOMElement("interval", synNS);
-        delay.setText(String.valueOf(reDeliveryDelay));
-
-        redilevery.addChild(delay);
-
-        int maxRedeliveries = messageStore.getRedeliveryProcessor().getMaxRedeleveries();
-        OMElement maxRedeliveryElm = fac.createOMElement("maximumRedeliveries", synNS);
-        maxRedeliveryElm.setText(String.valueOf(maxRedeliveries));
-
-        redilevery.addChild(maxRedeliveryElm);
-
-        if (messageStore.getRedeliveryProcessor().isExponentialBackoffEnable()) {
-             OMElement expBOElm = fac.createOMElement("exponentialBackoff", synNS);
-             expBOElm.setText("true");
-             redilevery.addChild(expBOElm);
-
-             OMElement multiplierElm = fac.createOMElement("backoffMutiplier", synNS);
-             int multiplier = messageStore.getRedeliveryProcessor().getBackOffMultiplier();
-             if (multiplier > 0) {
-                 multiplierElm.setText(String.valueOf(multiplier));
-                 redilevery.addChild(multiplierElm);
-             }
-        }
-
-        store.addChild(redilevery);
-
         if (messageStore.getParameters() != null) {
             Iterator iter = messageStore.getParameters().keySet().iterator();
             while (iter.hasNext()) {
@@ -118,6 +89,9 @@ public class MessageStoreSerializer {
             }
         }
 
+        if(messageStore.getMessageProcessor() != null) {
+            populateProcessorElem(store,fac,messageStore);
+        }
         if (getSerializedDescription(messageStore) != null) {
             store.addChild(getSerializedDescription(messageStore));
         }
@@ -140,6 +114,23 @@ public class MessageStoreSerializer {
         }
     }
 
+    private static void populateProcessorElem(OMElement storeElem ,OMFactory factory,
+                                              MessageStore messageStore) {
+        MessageProcessor processor = messageStore.getMessageProcessor();
+        OMElement processorElem = factory.createOMElement("processor" , synNS);
+        Map<String ,Object> parameters = processor.getParameters();
+
+        Iterator<Map.Entry<String ,Object>> it = parameters.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String,Object> entry = it.next();
+            OMElement paramElem = factory.createOMElement("parameter", synNS);
+            paramElem.addAttribute(fac.createOMAttribute("name", nullNS, entry.getKey()));
+            paramElem.setText((String)entry.getValue());
+            processorElem.addChild(paramElem);
+        }
+        storeElem.addChild(processorElem);
+    }
+
     private static void handleException(String msg) {
         log.error(msg);
         throw new SynapseException(msg);

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java Fri Feb 11 06:16:58 2011
@@ -21,22 +21,21 @@ package org.apache.synapse.endpoints;
 
 import org.apache.axis2.clustering.ClusteringAgent;
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.transport.base.BaseConstants;
 import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.transport.base.BaseConstants;
 import org.apache.axis2.util.JavaUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.*;
-import org.apache.synapse.mediators.MediatorFaultHandler;
-import org.apache.synapse.message.store.StorableMessage;
-import org.apache.synapse.mediators.MediatorProperty;
-import org.apache.synapse.aspects.ComponentType;
 import org.apache.synapse.aspects.AspectConfiguration;
+import org.apache.synapse.aspects.ComponentType;
 import org.apache.synapse.aspects.statistics.StatisticsReporter;
 import org.apache.synapse.commons.jmx.MBeanRegistrar;
 import org.apache.synapse.core.SynapseEnvironment;
 import org.apache.synapse.core.axis2.Axis2MessageContext;
 import org.apache.synapse.core.axis2.Axis2SynapseEnvironment;
+import org.apache.synapse.mediators.MediatorFaultHandler;
+import org.apache.synapse.mediators.MediatorProperty;
 
 import java.util.*;
 

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java?rev=1069682&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java Fri Feb 11 06:16:58 2011
@@ -0,0 +1,86 @@
+/*
+*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+*  WSO2 Inc. licenses this file to you under the Apache License,
+*  Version 2.0 (the "License"); you may not use this file except
+*  in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.mediators.store;
+
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.mediators.AbstractMediator;
+import org.apache.synapse.message.store.MessageStore;
+
+/**
+ * <code>MessageStoreMediator</code> will store the incoming Messages in associated MessageStore
+ */
+public class MessageStoreMediator extends AbstractMediator{
+
+    /**
+     * Name of the Mediator
+     */
+    private String name;
+
+    /**
+     *MessageStore Name
+     */
+    private String messageStoreName;
+
+    /**
+     * Sequence name to be invoked when the message is stored
+     */
+    private String  onStoreSequence;
+
+
+    public boolean mediate(MessageContext synCtx) {
+        if(synCtx != null) {
+            MessageStore messageStore = synCtx.getConfiguration().getMessageStore(messageStoreName);
+            if(messageStore != null) {
+                if(onStoreSequence != null) {
+                    Mediator sequence = synCtx.getSequence(onStoreSequence);
+                    if(sequence != null) {
+                        sequence.mediate(synCtx);
+                    }
+                }
+                messageStore.store(synCtx);
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getMessageStoreName() {
+        return messageStoreName;
+    }
+
+    public void setMessageStoreName(String messageStoreName) {
+        this.messageStoreName = messageStoreName;
+    }
+
+    public String  getOnStoreSequence() {
+        return onStoreSequence;
+    }
+
+    public void setOnStoreSequence(String  onStoreSequence) {
+        this.onStoreSequence = onStoreSequence;
+    }
+}

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java?rev=1069682&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java Fri Feb 11 06:16:58 2011
@@ -0,0 +1,104 @@
+/*
+*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+*  WSO2 Inc. licenses this file to you under the Apache License,
+*  Version 2.0 (the "License"); you may not use this file except
+*  in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.message.processors;
+
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.message.store.MessageStore;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *All Synapse Message Processors must implement <code>MessageProcessor</code> interface
+ *Message processors will process the Message using a Message Store.
+ *Message processing logic and process will depend on the
+ *concrete implementation of the MessageStore
+ */
+public interface MessageProcessor {
+
+    /**
+     * Start Message Processor
+     */
+    public void start();
+
+    /**
+     * Stop MessageProcessor
+     */
+    public void stop();
+
+    /**
+     * Set the Message Store that backs the Message processor
+     * @param messageStore the underlying MessageStore instance
+     */
+    public void setMessageStore(MessageStore messageStore);
+
+    /**
+     * Get the Message store that backs the Message processor
+     * @return   the underlying MessageStore instance
+     */
+    public MessageStore getMessageStore();
+
+    /**
+     * Set the Mediator/Sequence to be invoked just before processing a Message
+     * This Mediator or sequence will be invoked just before processing the Message
+     * @param mediator   Mediator/sequence instance that will invoked just before
+     * processing a Message
+     */
+    public void setOnProcessSequence(Mediator mediator);
+
+
+    /**
+     * Get the On process Mediator or sequence
+     * @return Mediator/sequence instance that will invoked just before processing a Message
+     */
+    public Mediator getOnProcessSequence();
+
+    /**
+     * This sequence/Mediator will be invoked when a Message is submitted to the MessageProcessor
+     * @param mediator Mediator/sequence instance that will invoked when a Message
+     * is submitted to the Processor
+     */
+    public void setOnSubmitSequence(Mediator mediator);
+
+    /**
+     * Get the OnSubmit Sequence which get invoked when a Message is submitted to
+     * the MessageProcessor
+     * @return mediator Mediator/sequence instance that will invoked when a Message
+     * is submitted to the Processor
+     */
+    public Mediator getOnSubmitSequence();
+
+    /**
+     * Set the Message processor parameters that will be used by the specific implementation
+     * @param parameters
+     */
+    public void setParameters(Map<String,Object> parameters);
+
+    /**
+     * Get the Message processor Parameters
+     * @return
+     */
+    public Map<String , Object> getParameters();
+
+    /**
+     * Returns weather a Message processor is started or not
+     * @return
+     */
+    public boolean isStarted();
+}

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java?rev=1069682&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java Fri Feb 11 06:16:58 2011
@@ -0,0 +1,108 @@
+/*
+*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+*  WSO2 Inc. licenses this file to you under the Apache License,
+*  Version 2.0 (the "License"); you may not use this file except
+*  in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.message.processors.dlc;
+
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseArtifact;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.message.store.MessageStore;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DeadLetterChannelView implements DeadLetterChannelViewMBean{
+
+    private MessageStore messageStore;
+
+
+    public DeadLetterChannelView(MessageStore messageStore) {
+        this.messageStore = messageStore;
+    }
+
+    public void resendAll() {
+        int size = messageStore.getSize();
+        for(int i = 0; i < size ; i++) {
+            MessageContext messageContext = messageStore.unstore(0,0).get(0);
+            if(messageContext != null) {
+                redeliver(messageContext);
+            }
+
+        }
+    }
+
+    public void deleteAll() {
+        int size = messageStore.getSize();
+        for(int i = 0; i < size ; i++) {
+            messageStore.unstore(0,0);
+        }
+    }
+
+    public List<String> getMessageIds() {
+       int size = messageStore.getSize();
+       List<String> list = new ArrayList<String>();
+        for(int i = 0; i < size ; i++) {
+            MessageContext messageContext = messageStore.unstore(0,0).get(0);
+            if(messageContext != null) {
+                list.add(messageContext.getMessageID());
+            }
+        }
+        return list;
+    }
+
+    public void resend(String messageID) {
+       MessageContext messageContext = messageStore.getMessage(messageID);
+       if(messageContext != null) {
+            redeliver(messageContext);
+       }
+    }
+
+    public void delete(String messageID) {
+        messageStore.unstore(messageID);
+    }
+
+    public String getEnvelope(String messageID) {
+        MessageContext messageContext = messageStore.getMessage(messageID);
+        if(messageContext != null) {
+            return  messageContext.getEnvelope().toString();
+        }
+
+        return null;
+    }
+
+    public int getSize() {
+        return messageStore.getSize();
+    }
+
+    private void redeliver(MessageContext messageContext) {
+        SynapseArtifact artifact = RedeliveryProcessor.getReplayTarget(messageContext);
+        if (artifact instanceof Endpoint) {
+            if (!RedeliveryProcessor.handleEndpointReplay((Endpoint) artifact,
+                    messageContext)) {
+                messageStore.store(messageContext);
+            }
+        } else if (artifact instanceof Mediator) {
+            if (!RedeliveryProcessor.handleSequenceReplay((Mediator) artifact,
+                    messageContext)) {
+                messageStore.store(messageContext);
+            }
+        } else {
+            messageStore.store(messageContext);
+        }
+    }
+}

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelViewMBean.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelViewMBean.java?rev=1069682&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelViewMBean.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelViewMBean.java Fri Feb 11 06:16:58 2011
@@ -0,0 +1,72 @@
+/*
+*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+*  WSO2 Inc. licenses this file to you under the Apache License,
+*  Version 2.0 (the "License"); you may not use this file except
+*  in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.message.processors.dlc;
+
+import java.util.List;
+
+public interface DeadLetterChannelViewMBean {
+
+    /**
+     * try resending all messages stored in the message store via associated endpoints.
+     */
+    public void resendAll();
+
+    /**
+     * Delete all the Messages in Message store
+     */
+    public void deleteAll();
+
+
+    /**
+     * Get the Message IDs of all stored Messages in the Message store
+     *
+     * @return a list of message ID values
+     */
+    public List<String> getMessageIds();
+
+    /**
+     * Resend the Message with the given id
+     * return false if fail to re try deliver the message
+     *
+     * @param messageID ID of the message to be resent
+     * @return true if the resend operation was successful and false otherwise
+     */
+    public void  resend(String messageID);
+
+    /**
+     * Delete the Message with Given id
+     *
+     * @param messageID ID of the message to be deleted
+     */
+    public void delete(String messageID);
+
+    /**
+     * Get the SOAP envelope of the given Message with given ID
+     *
+     * @param messageID ID of the message to be returned
+     * @return the SOAP envelope content as a string
+     */
+    public String getEnvelope(String messageID);
+
+    /**
+     *
+     * @return the number of Messages stored in the store.
+     */
+    public int getSize();
+
+}

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java?rev=1069682&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java Fri Feb 11 06:16:58 2011
@@ -0,0 +1,287 @@
+/*
+*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+*  WSO2 Inc. licenses this file to you under the Apache License,
+*  Version 2.0 (the "License"); you may not use this file except
+*  in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.message.processors.dlc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.*;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.message.processors.MessageProcessor;
+import org.apache.synapse.message.store.MessageStore;
+import org.apache.synapse.securevault.commons.MBeanRegistrar;
+
+import java.util.Map;
+
+/**
+ * Redelivery processor is the Message processor which implements the Dead letter channel EIP
+ * It will Time to time Redeliver the Messages to a given target.
+ */
+public class RedeliveryProcessor implements MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(RedeliveryProcessor.class);
+
+    /**
+     * Associated MessageStore
+     */
+    private MessageStore messageStore;
+
+
+    private Mediator onProcessMediator;
+
+
+    private Mediator onSubmitMediator;
+
+    private Map<String, Object> parameters;
+
+    /**
+     * Maximum number of redelivery's per message
+     */
+    private int maxRedeleveries = 0;
+
+    /**
+     * Delay between two consecutive redelivery attempts
+     */
+    private int redeliveryDelay = 2000;
+
+    /**
+     * enable/disable exponential backoff
+     */
+    private boolean exponentialBackoff = false;
+
+    /**
+     * the multiplier that will be used in the exponential backoff algorithm
+     */
+    private int backOffMultiplier = -1;
+
+
+    private DeadLetterChannelViewMBean dlcView;
+
+    private boolean started;
+
+    public static final String REDELIVERY_DELAY = "redelivery.delay";
+
+    public static final String MAX_REDELIVERY_COUNT = "redelivery.count";
+
+    public static final String EXPONENTIAL_BACKOFF = "redelivery.exponentialBackoff";
+
+    public static final String BACKOFF_MUTIPLIER = "redelivery.backoffMultiplier";
+
+
+    public static final String REPLAY_ENDPOINT = "replay.endpoint";
+
+    public static final String REPLAY_SEQUENCE = "replay.sequence";
+
+    public static final String NO_OF_REDELIVERIES = "number.of.redeliveries";
+
+    public void start() {
+        if (!started) {
+            Thread t = new Thread(new Worker());
+            t.start();
+        }
+    }
+
+    public void stop() {
+        started = false;
+    }
+
+    public void setMessageStore(MessageStore messageStore) {
+        this.messageStore = messageStore;
+        if(messageStore !=null) {
+            DeadLetterChannelView view = new DeadLetterChannelView(messageStore);
+            this.dlcView = view;
+            MBeanRegistrar.getInstance().registerMBean(view,"Dead Letter Channel",
+                    messageStore.getName());
+        }
+    }
+
+    public MessageStore getMessageStore() {
+        return this.messageStore;
+    }
+
+    public void setOnProcessSequence(Mediator mediator) {
+        this.onProcessMediator = mediator;
+    }
+
+    public Mediator getOnProcessSequence() {
+        return this.onProcessMediator;
+    }
+
+    public void setOnSubmitSequence(Mediator mediator) {
+        this.onSubmitMediator = mediator;
+    }
+
+    public Mediator getOnSubmitSequence() {
+        return this.onSubmitMediator;
+    }
+
+    public void setParameters(Map<String, Object> parameters) {
+        this.parameters = parameters;
+        if (parameters.containsKey(REDELIVERY_DELAY)) {
+            redeliveryDelay = Integer.parseInt((String) parameters.get(REDELIVERY_DELAY));
+        }
+
+        if (parameters.containsKey(MAX_REDELIVERY_COUNT)) {
+            maxRedeleveries = Integer.parseInt((String) parameters.get(MAX_REDELIVERY_COUNT));
+        }
+
+        if (parameters.containsKey(EXPONENTIAL_BACKOFF)) {
+            if ("true".equals(parameters.get(EXPONENTIAL_BACKOFF))) {
+                exponentialBackoff = true;
+            }
+        }
+
+        if (parameters.containsKey(BACKOFF_MUTIPLIER)) {
+            backOffMultiplier = Integer.parseInt((String) parameters.get(BACKOFF_MUTIPLIER));
+        }
+    }
+
+    public Map<String, Object> getParameters() {
+        return parameters;
+    }
+
+
+    private class Worker implements Runnable {
+
+        public void run() {
+            while (started) {
+
+
+                try {
+                    synchronized (this) {
+                        int delay = redeliveryDelay;
+
+                        MessageContext messageContext;
+                        messageContext = messageStore.getMessages(0, 0).get(0);
+
+                        if (messageContext == null) {
+                            continue;
+                        }
+
+                        SynapseArtifact artifact = getReplayTarget(messageContext);
+                        messageStore.unstore(0, 0);
+                        if (messageContext.getProperty(NO_OF_REDELIVERIES) == null) {
+                            messageContext.setProperty(NO_OF_REDELIVERIES, "0");
+                            delay = redeliveryDelay;
+                        }
+
+                        String numberS = (String) messageContext.getProperty(NO_OF_REDELIVERIES);
+                        int number = Integer.parseInt(numberS);
+
+                        if (number >= maxRedeleveries) {
+
+                            if (log.isDebugEnabled()) {
+                                log.debug("Maximum number of attempts tried for Message with ID " +
+                                        messageContext.getMessageID() +
+                                        "will be put back to the Message Store");
+
+                            }
+                            messageStore.store(messageContext);
+                            continue;
+                        }
+
+                        messageContext.setProperty(NO_OF_REDELIVERIES, "" + (number + 1));
+
+
+                        if (exponentialBackoff && backOffMultiplier == -1) {
+                            delay = (number + 1) * redeliveryDelay;
+
+                        } else if (exponentialBackoff) {
+                            delay = (int) Math.pow(backOffMultiplier, number) * redeliveryDelay;
+
+                        }
+
+
+                        try {
+                            Thread.sleep(delay);
+                        } catch (InterruptedException ignored) {
+
+                        }
+
+                        if (artifact instanceof Endpoint) {
+                            if (!handleEndpointReplay((Endpoint) artifact, messageContext)) {
+                                messageStore.store(messageContext);
+                            }
+                        } else if (artifact instanceof Mediator) {
+                            if (!handleSequenceReplay((Mediator) artifact, messageContext)) {
+                                messageStore.store(messageContext);
+                            }
+                        } else {
+                            messageStore.store(messageContext);
+                        }
+
+                        if (log.isDebugEnabled()) {
+                            log.debug("sent \n" + messageContext.getEnvelope());
+                        }
+
+
+                    }
+                } catch (Throwable e) {
+                    log.warn("Error while Running Redelivery process " +e.getMessage());
+                }
+
+            }
+
+        }
+    }
+
+    public static SynapseArtifact getReplayTarget(MessageContext context) {
+        //Endpoint replay get priority
+        if (context.getProperty(REPLAY_ENDPOINT) != null) {
+            String endpointName = (String) context.getProperty(REPLAY_ENDPOINT);
+            return context.getConfiguration().getDefinedEndpoints().get(endpointName);
+        } else if (context.getProperty(REPLAY_SEQUENCE) != null) {
+            String sequenceName = (String) context.getProperty(REPLAY_SEQUENCE);
+
+            return context.getConfiguration().getSequence(sequenceName);
+        }
+
+        return null;
+    }
+
+
+    public static boolean handleEndpointReplay(Endpoint endpoint, MessageContext messageContext) {
+        if (endpoint.readyToSend()) {
+            endpoint.send(messageContext);
+            return true;
+        }
+
+        return false;
+    }
+
+
+    public static boolean handleSequenceReplay(Mediator mediator, MessageContext messageContext) {
+        mediator.mediate(messageContext);
+        return true;
+    }
+
+    /**
+     * Get the DLC related JMX API
+     * @return   instance of Dead letter channel jms api
+     */
+    public DeadLetterChannelViewMBean getDlcView() {
+        return dlcView;
+    }
+
+    /**
+     * Get the started status of the Message processor
+     * @return started status of message processor (true/false)
+     */
+    public boolean isStarted() {
+        return started;
+    }
+}

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java Fri Feb 11 06:16:58 2011
@@ -22,13 +22,11 @@ package org.apache.synapse.message.store
 import org.apache.synapse.ManagedLifecycle;
 import org.apache.synapse.Mediator;
 import org.apache.synapse.MessageContext;
-import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.commons.jmx.MBeanRegistrar;
 import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.message.processors.MessageProcessor;
 
-import java.util.LinkedList;
-import java.util.Queue;
 import java.util.Map;
 
 public abstract class AbstractMessageStore implements MessageStore, ManagedLifecycle {
@@ -39,16 +37,6 @@ public abstract class AbstractMessageSto
     protected String name;
 
     /**
-     * associated redelivery processor
-     */
-    protected RedeliveryProcessor redeliveryProcessor;
-
-    /**
-     * queue that holds the sheduled messages
-     */
-    protected Queue<StorableMessage> scheduledMessageQueue = new LinkedList<StorableMessage>();
-
-    /**
      * name of the sequence to be executed before storing the message
      */
     protected String sequence;
@@ -59,17 +47,17 @@ public abstract class AbstractMessageSto
     protected MessageStoreView messageStoreMBean;
 
     /**
-     * synapse configuration reffrence
+     * synapse configuration reference
      */
     protected SynapseConfiguration synapseConfiguration;
 
     /**
-     * synapse environment reffrence
+     * synapse environment reference
      */
     protected SynapseEnvironment synapseEnvironment;
 
     /**
-     * Message store properties
+     * Message store parameters
      */
     protected Map<String,Object> parameters;
 
@@ -83,6 +71,11 @@ public abstract class AbstractMessageSto
      */
     protected String fileName;
 
+    /**
+     * Message processor instance associated with the MessageStore
+     */
+    protected MessageProcessor processor;
+
     public void init(SynapseEnvironment se) {
         this.synapseEnvironment = se;
         this.synapseConfiguration = synapseEnvironment.getSynapseConfiguration();
@@ -96,41 +89,13 @@ public abstract class AbstractMessageSto
         this.name = name;
         messageStoreMBean = new MessageStoreView(name, this);
         MBeanRegistrar.getInstance().registerMBean(messageStoreMBean,
-                "DeadLetterChannel", this.name);
-    }
-
-    public void setRedeliveryProcessor(RedeliveryProcessor redeliveryProcessor) {
-        this.redeliveryProcessor = redeliveryProcessor;
-    }
-
-    public RedeliveryProcessor getRedeliveryProcessor() {
-        return redeliveryProcessor;
+                "MessageStore", this.name);
     }
 
 
-    public void schedule(StorableMessage storableMessage) {
-        if (storableMessage != null) {
-            scheduledMessageQueue.add(storableMessage);
-        }
-
-        if (scheduledMessageQueue.size() > 0 && redeliveryProcessor != null &&
-                !redeliveryProcessor.isStarted()) {
-            redeliveryProcessor.start();
-        }
-    }
-
-    public StorableMessage dequeueScheduledQueue() {
-        return scheduledMessageQueue.poll();
-    }
-
-    public StorableMessage getFirstSheduledMessage() {
-        return scheduledMessageQueue.peek();
-    }
-
     protected void mediateSequence(MessageContext synCtx) {
 
-        if (sequence != null && synCtx != null && "true".equalsIgnoreCase(
-                (String) synCtx.getProperty(SynapseConstants.MESSAGE_STORE_REDELIVERED))) {
+        if (sequence != null && synCtx != null) {
             Mediator seq = synCtx.getSequence(sequence);
             if (seq != null) {
                 seq.mediate(synCtx);
@@ -138,6 +103,14 @@ public abstract class AbstractMessageSto
         }
     }
 
+    public void setMessageProcessor(MessageProcessor messageProcessor) {
+        this.processor = messageProcessor;
+    }
+
+    public MessageProcessor getMessageProcessor() {
+        return processor;
+    }
+
     public int getSize() {
         return -1;
     }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java Fri Feb 11 06:16:58 2011
@@ -21,6 +21,7 @@ package org.apache.synapse.message.store
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
 
 import java.util.*;
 
@@ -32,46 +33,86 @@ public class InMemoryMessageStore extend
     private static final Log log = LogFactory.getLog(InMemoryMessageStore.class);
 
     /** The map that keeps the stored messages */
-    private Map<String, StorableMessage> messageList = new HashMap<String, StorableMessage>();
+    private Map<String, MessageContext> messageList = new HashMap<String, MessageContext>();
 
-    public void store(StorableMessage storableMessage) {
-        if (storableMessage != null) {
-            mediateSequence(storableMessage.getMessageContext());
-            messageList.put(storableMessage.getMessageContext().getMessageID(), storableMessage);
+    public void store(MessageContext messageContext) {
+        if (messageContext != null) {
+            mediateSequence(messageContext);
+            messageList.put(messageContext.getMessageID(), messageContext);
+
+            /**
+             * If the associated processor is not started we start it. When storing the message
+             */
+            if(!processor.isStarted()) {
+                processor.start();
+            }
             if (log.isDebugEnabled()) {
-                log.debug("Message " + storableMessage.getMessageContext().getMessageID() +
+                log.debug("Message " + messageContext.getMessageID() +
                         " has been stored");
             }
         }
     }
 
-    public StorableMessage unstore(String messageID) {
+    public MessageContext unstore(String messageID) {
         if (messageID != null) {
             return messageList.remove(messageID);
         }
         return null;
     }
 
-    public List<StorableMessage> unstoreAll() {
-        List<StorableMessage> returnList = new ArrayList<StorableMessage>();
+    public List<MessageContext> unstoreAll() {
+        List<MessageContext> returnList = new ArrayList<MessageContext>();
         for (String k : messageList.keySet()) {
             returnList.add(messageList.remove(k));
         }
         return returnList;
     }
 
-    public List<StorableMessage> getAllMessages() {
-        List<StorableMessage> returnlist = new ArrayList<StorableMessage>();
-
+    public List<MessageContext> unstore(int maxNumberOfMessages) {
+        List<MessageContext> returnList = new ArrayList<MessageContext>();
         Iterator<String> it = messageList.keySet().iterator();
-        while (it.hasNext()) {
-            returnlist.add(messageList.get(it.next()));
+        while (it.hasNext() && maxNumberOfMessages > 0) {
+            returnList.add(messageList.get(it.next()));
+            maxNumberOfMessages--;
         }
 
+        return returnList;
+    }
+
+    public List<MessageContext> unstore(int from, int to) {
+        List<MessageContext> returnlist = new ArrayList<MessageContext>();
+        if (from <= to && (from <= messageList.size() && to <= messageList.size())) {
+
+            String[] keys = messageList.keySet().toArray(new String[0]);
+
+            for (int i = from; i <= to; i++) {
+                returnlist.add(messageList.remove(keys[i]));
+            }
+        }
+        return returnlist;
+    }
+
+    public List<MessageContext> getMessages(int from, int to) {
+         List<MessageContext> returnlist = new ArrayList<MessageContext>();
+        if (from <= to && (from <= messageList.size() && to <= messageList.size())) {
+            String[] keys = messageList.keySet().toArray(new String[0]);
+
+            for (int i = from; i <= to; i++) {
+                returnlist.add(messageList.get(keys[i]));
+            }
+        }
         return returnlist;
     }
 
-    public StorableMessage getMessage(String messageId) {
+    public List<MessageContext> getAllMessages() {
+        List<MessageContext> returnList = new ArrayList<MessageContext>();
+        for (Map.Entry<String ,MessageContext> entry :messageList.entrySet()) {
+            returnList.add(entry.getValue());
+        }
+        return returnList;
+    }
+
+    public MessageContext getMessage(String messageId) {
         if (messageId != null) {
             return messageList.get(messageId);
         }
@@ -79,6 +120,18 @@ public class InMemoryMessageStore extend
         return null;
     }
 
+    public List<MessageContext> getMessages(int maxNumberOfMessages) {
+        List<MessageContext> returnList = new ArrayList<MessageContext>();
+
+        Iterator<String> it = messageList.keySet().iterator();
+        while (it.hasNext() && maxNumberOfMessages > 0) {
+            returnList.add(messageList.get(it.next()));
+            maxNumberOfMessages--;
+        }
+
+        return returnList;
+    }
+
     public int getSize() {
         return messageList.size();
     }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java Fri Feb 11 06:16:58 2011
@@ -19,9 +19,11 @@
 
 package org.apache.synapse.message.store;
 
+import org.apache.synapse.MessageContext;
 import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.SynapseArtifact;
 import org.apache.synapse.Nameable;
+import org.apache.synapse.message.processors.MessageProcessor;
 
 import java.util.List;
 import java.util.Map;
@@ -33,71 +35,56 @@ import java.util.Map;
 public interface MessageStore extends SynapseArtifact, Nameable {
 
     /**
-     * store the Message in the Message Store
-     * Underlying message store implementation must handle the efficient way of storing the Message
-     * @param storableMessage wrapper of the Message context
+     * Store the Message in the Message Store
+     * @param messageContext  MessageContext to be saved
      */
-    public void store(StorableMessage storableMessage);
+    public void store(MessageContext messageContext);
 
     /**
-     * Store the Message in schedule queue to redeliver
-     *
-     * @param storableMessage A StorableMessage instance
+     * Delete and return the MessageContext with given Message id
+     * @param messageID  message id of the Message
+     * @return  MessageContext instance
      */
-    public void schedule(StorableMessage storableMessage);
-
-
-    public StorableMessage dequeueScheduledQueue();
+    public MessageContext unstore(String messageID);
 
     /**
-     * return the Message That is on top of the queue
-     *
-     * @return A StorableMessage instance or null
+     * Delete all the Messages in the Message Store
+     * @return  List of all messages in store
      */
-    public StorableMessage getFirstSheduledMessage();
+    public List<MessageContext> unstoreAll();
 
-    /**
-     * Unstore the Message with Given Message Id from the MessageStore
-     * @param messageID a message ID string
-     * @return unstored Message
-     */
-    public StorableMessage unstore(String messageID);
 
     /**
-     * Delete all the Messages in the Message Store
-     * @return  List of all messages in store
+     * Unstore Messages from index 'from' to index 'to'
+     * Message ordering will be depend on the implementation
+     * @param from start index
+     * @param to  stop index
+     * @return   list of messages that are belong to given range
      */
-    public List<StorableMessage> unstoreAll();
+    public List<MessageContext> unstore(int from , int to);
 
     /**
      * Get the All messages in the Message store without removing them from the queue
      * @return List of all Messages
      */
-    public List<StorableMessage> getAllMessages();
+    public List<MessageContext> getAllMessages();
 
     /**
      * Get the Message with the given ID from the Message store without removing it
      * @param messageId A message ID string
      * @return Message with given ID
      */
-    public StorableMessage getMessage(String messageId);
+    public MessageContext getMessage(String messageId);
 
-    /**
-     * Set the redelivery processor instance associated with the Message Store
-     * redelivery processor have the responsibility of redelivery message according
-     * to a policy defined
-     *
-     * @param redeliveryProcessor The redelivery processor to be registered
-     */
-    public void setRedeliveryProcessor(RedeliveryProcessor redeliveryProcessor);
 
     /**
-    * Return the redelivery processor instance associated with the message store
-     *
-     * @return A RedlieveryProcessor or null
+     * Get Messages from index 'from' to index 'to'
+     * Message ordering will be depend on the implementation
+     * @param from start index
+     * @param to  stop index
+     * @return   list of messages that are belong to given range
      */
-    public RedeliveryProcessor getRedeliveryProcessor();
-
+    public List<MessageContext> getMessages(int from , int to);
     /**
      * set the implementation specific parameters
      * @param parameters A map of parameters or null
@@ -156,5 +143,17 @@ public interface MessageStore extends Sy
      * @return Name of the file where this artifact is defined
      */
     public String getFileName();
-    
+
+
+    /**
+     * Set the Message Processor Associated with the Message Store
+     * @param messageProcessor message processor instance associated with message store
+     */
+    public void setMessageProcessor(MessageProcessor messageProcessor);
+
+    /**
+     * Get the Message Processor associated with the MessageStore
+     * @return   message processor instance associated with the message store
+     */
+    public MessageProcessor getMessageProcessor();
 }
\ No newline at end of file

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreView.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreView.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreView.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreView.java Fri Feb 11 06:16:58 2011
@@ -21,6 +21,7 @@ package org.apache.synapse.message.store
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -38,19 +39,6 @@ public class MessageStoreView implements
         this.messageStore = messageStore;
     }
 
-    public void resendAll() {
-        List<StorableMessage> list = messageStore.getAllMessages();
-
-        for(int i = 0; i < list.size(); i++) {
-            StorableMessage m = list.get(i);
-            // wait till the endpoint is ready
-            while(!m.getEndpoint().readyToSend());
-            //resend
-            m.getEndpoint().send(m.getMessageContext());
-        }
-
-        log.info("All Messages in Message Store " +messageStoreName+ " were resent");
-    }
 
     public void deleteAll() {
         messageStore.unstoreAll();
@@ -60,36 +48,17 @@ public class MessageStoreView implements
     public List<String> getMessageIds() {
 
         List<String> returnList = new ArrayList<String>();
-        List<StorableMessage> list = messageStore.getAllMessages();
+        List<MessageContext> list = messageStore.getAllMessages();
 
-        for(StorableMessage m : list) {
-            returnList.add(m.getMessageContext().getMessageID());
+        for(MessageContext m : list) {
+            returnList.add(m.getMessageID());
         }
         return returnList;
     }
 
-    public boolean resend(String messageID) {
-
-        StorableMessage m = messageStore.getMessage(messageID);
-
-        if (m != null) {
-            if (m.getEndpoint().readyToSend()) {
-                m.getEndpoint().send(m.getMessageContext());
-                log.info("Message with ID " + messageID + " resent via the Endpoint" +
-                        m.getEndpoint().getName());
-                return true;
-            } else {
-                log.info("Message with ID " + messageID +" unable resent via the Endpoint" +
-                        m.getEndpoint().getName());
-            }
-        }
-
-        return false;
-    }
-
     public void delete(String messageID) {
         if(messageID != null) {
-            StorableMessage m =messageStore.unstore(messageID);
+            MessageContext m =messageStore.unstore(messageID);
             if (m != null){
                 log.info("Message with ID :" + messageID + " removed from the MessageStore");
             }
@@ -98,10 +67,10 @@ public class MessageStoreView implements
 
     public String getEnvelope(String messageID) {
         if (messageID != null) {
-            StorableMessage m = messageStore.getMessage(messageID);
+            MessageContext m = messageStore.getMessage(messageID);
 
             if (m != null) {
-                return m.getMessageContext().getEnvelope().toString();
+                return m.getEnvelope().toString();
             }
         }
         return null;
@@ -110,4 +79,8 @@ public class MessageStoreView implements
     public int getSize() {
         return messageStore.getSize();
     }
+
+    public void delete(int maxCount) {
+        messageStore.unstore(0,maxCount-1);
+    }
 }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreViewMBean.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreViewMBean.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreViewMBean.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreViewMBean.java Fri Feb 11 06:16:58 2011
@@ -24,31 +24,24 @@ import java.util.List;
 public interface MessageStoreViewMBean {
 
     /**
-     * try resending all messages stored in the message store via associated endpoints.
-     */
-    public void resendAll();
-
-    /**
      * Delete all the Messages in Message store
      */
     public void deleteAll();
 
 
     /**
+     * Delete given number of Messages from the MessageStore
+     * @param maxCount
+     */
+    public void delete(int maxCount);
+
+    /**
      * Get the Message IDs of all stored Messages in the Message store
      *
      * @return a list of message ID values
      */
     public List<String> getMessageIds();
 
-    /**
-     * Resend the Message with the given id
-     * return false if fail to re try deliver the message
-     *
-     * @param messageID ID of the message to be resent
-     * @return true if the resend operation was successful and false otherwise
-     */
-    public boolean  resend(String messageID);
 
     /**
      * Delete the Message with Given id
@@ -58,7 +51,7 @@ public interface MessageStoreViewMBean {
     public void delete(String messageID);
 
     /**
-     * Get the SOAP envelope of the given Messaage with given ID
+     * Get the SOAP envelope of the given Message with given ID
      *
      * @param messageID ID of the message to be returned
      * @return the SOAP envelope content as a string

Modified: synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageStoreSerializationTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageStoreSerializationTest.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageStoreSerializationTest.java (original)
+++ synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageStoreSerializationTest.java Fri Feb 11 06:16:58 2011
@@ -39,12 +39,6 @@ public class MessageStoreSerializationTe
         String messageStoreConfiguration = "<syn:messageStore xmlns:syn=\"" +
                 "http://ws.apache.org/ns/synapse\"" +
                 " name=\"foo\" sequence=\"seq1\" >" +
-                "<syn:redelivery>" +
-                "<syn:interval>1</syn:interval>" +
-                "<syn:maximumRedeliveries>5</syn:maximumRedeliveries>" +
-                "<syn:exponentialBackoff>true</syn:exponentialBackoff>" +
-                "<syn:backoffMutiplier>2</syn:backoffMutiplier>" +
-                "</syn:redelivery>" +
                 "</syn:messageStore>";
 
         OMElement messageStoreElement = createOMElement(messageStoreConfiguration);