You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ch...@apache.org on 2013/05/29 20:55:02 UTC

svn commit: r1487592 - in /synapse/trunk/java/modules/core/src: main/java/org/apache/synapse/config/xml/ main/java/org/apache/synapse/message/processors/resequence/ test/java/org/apache/synapse/config/xml/

Author: charith
Date: Wed May 29 18:55:01 2013
New Revision: 1487592

URL: http://svn.apache.org/r1487592
Log:
Applying patch for SYNAPSE-893 .Thanks Buddhima Wijeweera for contribution. 

Added:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingJob.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingProcessor.java
Modified:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java
    synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageProcessorSerializationTest.java

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java?rev=1487592&r1=1487591&r2=1487592&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java Wed May 29 18:55:01 2013
@@ -25,6 +25,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.SynapseException;
 import org.apache.synapse.message.processors.MessageProcessor;
+import org.apache.synapse.util.xpath.SynapseXPath;
+import org.jaxen.JaxenException;
 
 import javax.xml.namespace.QName;
 import java.util.HashMap;
@@ -47,24 +49,26 @@ public class MessageProcessorFactory {
     private static final Log log = LogFactory.getLog(MessageProcessorFactory.class);
     public static final QName CLASS_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "class");
     public static final QName NAME_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "name");
+    public static final QName EXPRESSION_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "expression");
     public static final QName PARAMETER_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE,
-            "parameter");
-    public static final QName MESSAGE_STORE_Q = new QName(XMLConfigConstants.NULL_NAMESPACE ,
-            "messageStore");
+                                                      "parameter");
+    public static final QName MESSAGE_STORE_Q = new QName(XMLConfigConstants.NULL_NAMESPACE,
+                                                          "messageStore");
     private static final QName DESCRIPTION_Q
             = new QName(SynapseConstants.SYNAPSE_NAMESPACE, "description");
 
 
     /**
      * Creates a Message processor instance from given xml configuration element
+     *
      * @param elem OMElement of that contain the Message processor configuration
-     * @return  created message processor instance
+     * @return created message processor instance
      */
     public static MessageProcessor createMessageProcessor(OMElement elem) {
         MessageProcessor processor = null;
         OMAttribute clssAtt = elem.getAttribute(CLASS_Q);
 
-        if(clssAtt != null) {
+        if (clssAtt != null) {
             try {
                 Class cls = Class.forName(clssAtt.getAttributeValue());
                 processor = (MessageProcessor) cls.newInstance();
@@ -86,7 +90,7 @@ public class MessageProcessorFactory {
 
         OMAttribute storeAtt = elem.getAttribute(MESSAGE_STORE_Q);
 
-        if(storeAtt != null) {
+        if (storeAtt != null) {
             assert processor != null;
             processor.setMessageStoreName(storeAtt.getAttributeValue());
         } else {
@@ -115,9 +119,19 @@ public class MessageProcessorFactory {
                 OMElement prop = (OMElement) o;
                 OMAttribute paramName = prop.getAttribute(NAME_Q);
                 String paramValue = prop.getText();
+                OMAttribute paramExpression = prop.getAttribute(EXPRESSION_Q);
                 if (paramName != null) {
-                    if (paramValue != null) {
+                    if (paramExpression != null) {
+                        try {
+                            SynapseXPath expression = SynapseXPathFactory.getSynapseXPath(prop, paramExpression.getAttributeValue());
+                            parameters.put(paramName.getAttributeValue(), expression);
+                        } catch (JaxenException e) {
+                            handleException("Error while creating expression " + e.getMessage());
+                        }
+                    } else if (paramValue != null) {
                         parameters.put(paramName.getAttributeValue(), paramValue);
+                    } else {
+                        handleException("Invalid MessageStore parameter - Parameter must have a value or an expression ");
                     }
                 } else {
                     handleException("Invalid MessageStore parameter - Parameter must have a name ");

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java?rev=1487592&r1=1487591&r2=1487592&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java Wed May 29 18:55:01 2013
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.SynapseException;
 import org.apache.synapse.message.processors.MessageProcessor;
+import org.apache.synapse.util.xpath.SynapseXPath;
 
 import javax.xml.namespace.QName;
 import java.util.Iterator;
@@ -39,6 +40,7 @@ import java.util.Iterator;
  * <parameter name="string"&gt"string" <parameter>
  * <parameter name="string"&gt"string" <parameter>
  * <parameter name="string"&gt"string" <parameter>
+ * <parameter name="string" expression="string">
  * .
  * .
  * </messageProcessor>
@@ -55,15 +57,17 @@ public class MessageProcessorSerializer 
 
     /**
      * Serialize a give Message processor instance to XML configuration
-     * @param parent parent configuration
+     *
+     * @param parent    parent configuration
      * @param processor message processor instance
-     * @return  created XML configuration
+     * @return created XML configuration
      */
-    public static OMElement serializeMessageProcessor(OMElement parent, MessageProcessor processor) {
+    public static OMElement serializeMessageProcessor(OMElement parent,
+                                                      MessageProcessor processor) {
         OMElement processorElem = fac.createOMElement("messageProcessor", synNS);
         if (processor != null) {
             processorElem.addAttribute(fac.createOMAttribute("class", nullNS,
-                    processor.getClass().getName()));
+                                                             processor.getClass().getName()));
         } else {
             handleException("Invalid processor. Provider is required");
         }
@@ -74,21 +78,32 @@ public class MessageProcessorSerializer 
             handleException("Message store Name not specified");
         }
 
-        if(processor.getMessageStoreName() != null) {
+        if (processor.getMessageStoreName() != null) {
             processorElem.addAttribute(fac.createOMAttribute(
-                    "messageStore",nullNS,processor.getMessageStoreName()));
+                    "messageStore", nullNS, processor.getMessageStoreName()));
         }
 
         if (processor.getParameters() != null) {
             Iterator iterator = processor.getParameters().keySet().iterator();
             while (iterator.hasNext()) {
                 String name = (String) iterator.next();
-                String value = (String) processor.getParameters().get(name);
+                Object object = processor.getParameters().get(name);
                 OMElement property = fac.createOMElement("parameter", synNS);
                 property.addAttribute(fac.createOMAttribute(
                         "name", nullNS, name));
-                property.setText(value.trim());
+
+                if (object instanceof String) {
+                    String value = (String) object;
+                    property.setText(value.trim());
+
+                } else if (object instanceof SynapseXPath) {
+                    SynapseXPath expression = (SynapseXPath) object;
+                    SynapseXPathSerializer.serializeXPath(expression,property,"expression");
+
+                }
+
                 processorElem.addChild(property);
+
             }
         }
 

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingJob.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingJob.java?rev=1487592&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingJob.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingJob.java Wed May 29 18:55:01 2013
@@ -0,0 +1,273 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.message.processors.resequence;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.message.processors.MessageProcessorConsents;
+import org.apache.synapse.message.processors.ScheduledMessageProcessor;
+import org.apache.synapse.message.store.MessageStore;
+import org.apache.synapse.util.xpath.SynapseXPath;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * All necessary logic for Resequencing is implemented with in this class.
+ * This class extends from Job class which comes from Quartz
+ */
+public class ResequencingJob implements Job {
+
+    /**
+     * Log is set to the current class
+     */
+    private static final Log log = LogFactory.getLog(ResequencingJob.class);
+
+    /**
+     * This method will takes the necessary parameters from parameter list and do the resequencing
+     * Resequencing is done through reading messages until the next-to-send message is found
+     * If required is not found then waits until the next instance is created.
+     *
+     * @param jobExecutionContext - a bundle with information related to environment
+     * @throws JobExecutionException - to indicate Quartz scheduler that an error occurred while executing the job
+     */
+    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
+
+        final JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
+        final MessageStore messageStore = (MessageStore) jdm.get(MessageProcessorConsents.MESSAGE_STORE);
+        final ResequencingProcessor processor = (ResequencingProcessor) jdm.get(
+                ScheduledMessageProcessor.PROCESSOR_INSTANCE);
+
+        final Map<String, Object> parameters = (Map<String, Object>) jdm.get(MessageProcessorConsents.PARAMETERS);
+        final String sequence = (String) parameters.get(ResequencingProcessor.NEXT_SEQUENCE);
+
+        SynapseXPath seqNoxPath = null;
+
+        /** Checking for activation of processor or existence of message store  */
+        if (!processor.isActive() || messageStore == null) {
+            return;
+        }
+
+        /** Extract the SynapseXpath object from parameters to identify the sequence number of the message */
+        if (parameters != null && parameters.get(ResequencingProcessor.SEQUENCE_NUMBER_XPATH) != null) {
+            seqNoxPath = (SynapseXPath) parameters.get(ResequencingProcessor.SEQUENCE_NUMBER_XPATH);
+        }
+
+        /** Extract the number of messages interested to come */
+        if (parameters != null && parameters.get(ResequencingProcessor.REQ_INIT_MSGS) != null) {
+            processor.setRequiredInitMsgs(new AtomicInteger(Integer.parseInt((String) parameters.get(
+                    ResequencingProcessor.REQ_INIT_MSGS))));
+        }
+        /** Extract the delay wait until the interested messages come */
+        if (parameters != null && parameters.get(ResequencingProcessor.REQ_INIT_MSGS_DELAY) != null) {
+            processor.setRequiredInitMsgsDelay(new AtomicInteger(Integer.parseInt((String) parameters.get(
+                    ResequencingProcessor.REQ_INIT_MSGS_DELAY))));
+        }
+
+        /** Extract whether to delete duplicate messages */
+        if (parameters != null && parameters.get(ResequencingProcessor.DELETE_DUPLICATES) != null) {
+            String result=(String) parameters.get(ResequencingProcessor.DELETE_DUPLICATES);
+            if(result.equalsIgnoreCase("TRUE")){
+                processor.setDeleteDuplicates(new AtomicBoolean(true));
+            }
+        }
+
+
+        if (!processor.isInitSeqNo().get()) {
+            /** Deactivating Resequencing processor to avoid executing multiple Job instances */
+            processor.deactivate();
+
+            do {
+                delay(ResequencingProcessor.STARTING_NUMBER_INIT_DELAY);
+
+                if (messageStore.size() >= processor.getRequiredInitMsgs().get()) {
+                    selectStartingSeqNo(processor, messageStore, seqNoxPath);
+                    break;
+                }
+                processor.tried++;
+            } while (processor.tried < processor.getRequiredInitMsgsDelay().get());
+
+        }
+
+        if (!processor.isInitSeqNo().get()) {
+
+            while (true) {
+
+                if (messageStore.size() > 0) {
+                    selectStartingSeqNo(processor, messageStore, seqNoxPath);
+
+                    if (!processor.isInitSeqNo().get()) {
+                        log.warn("Resequencer failed to select starting sequence number with in given timeout !");
+                    }
+
+                    break;
+                }
+
+            }
+
+        }
+
+        /** Continue to this section happens only after initializing the starting sequence number */
+        boolean errorStop = false;
+        while (!errorStop) {
+
+            /** Iterate through message store */
+            for (int messageIndex = 0; ; messageIndex++) {
+                MessageContext messageContext = messageStore.get(messageIndex);
+
+                if (messageContext == null) {
+                    errorStop = true;
+                    break;
+                }
+
+                /** Extract the sequence number from the message */
+                int sequenceNo;
+                try {
+                    sequenceNo = Integer.parseInt(seqNoxPath.stringValueOf(messageContext));
+                } catch (Exception e) {
+                    log.warn("Can't Find sequence number from message " + e.getMessage());
+                    continue;
+                }
+
+                String messageId = messageContext.getMessageID();
+
+                /** Remove messages which have less sequence number than required */
+                if(sequenceNo<processor.getNextSeqNo() && processor.getDeleteDuplicates()){
+                    messageStore.remove(messageId);
+                }
+
+                /** Compare the next-to-go sequence number with current message sequence number */
+                if (sequenceNo == processor.getNextSeqNo()) {
+
+                    /** Remove selected message from store */
+
+                    messageStore.remove(messageId);
+                    /** If sending does not failed increase sequence number */
+                    if (send(messageContext, sequence)) {
+
+                        processor.incrementNextSeqNo();
+                    }
+                    /** Break and start searching from beginning */
+                    break;
+                }
+
+
+            }
+
+        }
+
+        /** Reactivating Processor after selecting initial sequence number */
+        if (!processor.isActive()) {
+            processor.activate();
+        }
+
+    }
+
+    /**
+     * Selects the smallest sequence number as the starting sequence number from a given message store
+     *
+     * @param processor    - Resequencing processor which is interested to know starting sequence number
+     * @param messageStore - Message store that contains messages
+     * @param seqNoxPath   - SynapseXpath object which contains the xpath to find the sequence number from a message
+     */
+    private void selectStartingSeqNo(ResequencingProcessor processor, MessageStore messageStore,
+                                     SynapseXPath seqNoxPath) {
+        /** Iterate through message store */
+        for (int messageIndex = 0; ; messageIndex++) {
+            try {
+                MessageContext messageContext = messageStore.get(messageIndex);
+                if (messageContext == null) {
+                    break;
+                }
+                /** Extract the sequence number from the message */
+                int sequenceNo;
+
+                sequenceNo = Integer.parseInt(seqNoxPath.stringValueOf(messageContext));
+
+
+                /** If the sequence number is smaller that current next-sequence number, current next-sequence number get replaced */
+                if (sequenceNo < processor.getNextSeqNo()) {
+                    processor.setNextSeqNo(sequenceNo);
+                    processor.setInitSeqNo(new AtomicBoolean(true));
+                }
+
+
+            } catch (NumberFormatException e) {
+                handleException("Invalid xPath parameter - Sequence number specified is not an integer ");
+            } catch (Exception e) {
+                handleException("Failed to initialize starting sequence number at startup: " + e.getMessage());
+            }
+        }
+
+
+    }
+
+    /**
+     * To timePeriod the processor until next checking up
+     * This method is in use wen initializing the starting sequence number of the resequencer
+     *
+     * @param timePeriod - the time period which waits before a single cycle
+     */
+    private void delay(long timePeriod) {
+        try {
+            Thread.sleep(timePeriod);
+        } catch (InterruptedException e) {
+            log.error(new String("Interrupted while thread sleeping in resequencer"));
+        }
+    }
+
+    /**
+     * Transmit the message in to a given sequence
+     * This method will takes the sequence given in sequence parameter. If no sequence is given this will return false     *
+     *
+     * @param messageContext - the content of the message that is transferred by Resequencer from message store
+     * @param sequence       - the sequence name that the message should be passed
+     * @return boolean         - to indicate the success of transferring the message
+     */
+    private boolean send(MessageContext messageContext, String sequence) {
+
+        Mediator processingSequence = messageContext.getSequence(sequence);
+        if (processingSequence != null) {
+            processingSequence.mediate(messageContext);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Handling errors are done here.
+     * This will log the error messages and throws SynapseException
+     *
+     * @param msg - Error message to be set
+     * @throws SynapseException - Exception related to Synapse at Runtime
+     */
+    private static void handleException(String msg) {
+        log.error(msg);
+        throw new SynapseException(msg);
+    }
+
+}

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingProcessor.java?rev=1487592&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingProcessor.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingProcessor.java Wed May 29 18:55:01 2013
@@ -0,0 +1,344 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.message.processors.resequence;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.message.processors.ScheduledMessageProcessor;
+import org.apache.synapse.message.store.MessageStore;
+import org.apache.synapse.util.xpath.SynapseXPath;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.SchedulerException;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class will be used as the processor of the resequencer and set up
+ * the necessary environment for the ReequencingJob.
+ * This should be run periodically after given time interval and
+ * for that this should be inherited from ScheduledMessageProcessor class
+ */
+public class ResequencingProcessor extends ScheduledMessageProcessor {
+
+    /**
+     * Log is set to the current class
+     */
+    private static final Log log = LogFactory.getLog(ResequencingProcessor.class);
+
+    /**
+     * State of the processor
+     */
+    private AtomicBoolean active = new AtomicBoolean(true);
+
+    /**
+     * To indicate whether the starting sequence number is set while initializing the processor
+     */
+    private AtomicBoolean initSeqNo = new AtomicBoolean(false);
+
+    /**
+     * Sequence number of the message that should be send next
+     */
+    private AtomicInteger nextSeqNo = new AtomicInteger(Integer.MAX_VALUE);
+
+    /**
+     * Number of messages interested to come
+     * Number of messages that the Resequencing processor should wait for before selecting the starting sequence number.
+     * Default value is 4
+     */
+    private AtomicInteger requiredInitMsgs = new AtomicInteger(4);
+
+    /**
+     * Time to wait for interested number of messages to come
+     */
+    private AtomicInteger requiredInitMsgsDelay = new AtomicInteger(5);
+
+    /**
+     * xpath expression to extract the sequence number
+     */
+    public static final String SEQUENCE_NUMBER_XPATH = "seqNumXpath";
+
+    /**
+     * Sequence that the messages should be passed to
+     */
+    public static final String NEXT_SEQUENCE = "nextEsbSequence";
+
+    /**
+     * Required initial number of messages
+     */
+    public static final String REQ_INIT_MSGS = "requiredInitMessages";
+
+    /**
+     * Delay until getting required number of messages receive
+     */
+    public static final String REQ_INIT_MSGS_DELAY = "requiredInitMessagesDelay";
+
+    /**
+     * Delay time period required for Resequencer processor while initializing starting sequence number
+     */
+    public static final int STARTING_NUMBER_INIT_DELAY = 6000;
+
+    /**
+     * Number of times currently processor waited until required number of messages come
+     * Max value is determined by   requiredInitMsgs variable value
+     */
+    public int tried = 0;
+
+    public static final String DELETE_DUPLICATES="deleteDuplicateMessages";
+
+    private AtomicBoolean deleteDuplicates=new AtomicBoolean(false);
+
+
+    /**
+     * Initiate the processor with SynapseEnvironment
+     *
+     * @param se - SynapseEnvironment to be set
+     */
+    @Override
+    public void init(SynapseEnvironment se) {
+        super.init(se);
+
+        /** Set the initial sequence number */
+        findFirstSeqNum();
+
+    }
+
+
+    /**
+     * This method use to find the minimum sequence number in the message store at the startup
+     */
+    private void findFirstSeqNum() {
+
+
+        MessageStore store = configuration.getMessageStore(messageStore);
+        SynapseXPath seqNoxPath = null;
+
+        /** Extract the SynapseXpath configuration.getMessageStore(messageStore)object from parameters to
+         * identify the sequence number of the message */
+        if (parameters != null && parameters.get(ResequencingProcessor.SEQUENCE_NUMBER_XPATH) != null) {
+            seqNoxPath = (SynapseXPath) parameters.get(ResequencingProcessor.SEQUENCE_NUMBER_XPATH);
+        }
+
+
+        /** Iterate through message store */
+        for (int messageIndex = 0; ; messageIndex++) {
+
+            try {
+                MessageContext messageContext = store.get(messageIndex);
+                if (messageContext == null) {
+                    break;
+                }
+
+                /** Extract the sequence number from the message */
+                int sequenceNo = Integer.parseInt(seqNoxPath.stringValueOf(messageContext));
+
+                /** If the sequence number is smaller that current next-sequence number, current next-sequence
+                 * number get replaced */
+                if (sequenceNo < getNextSeqNo()) {
+                    setNextSeqNo(sequenceNo);
+                    /** To indicate that starting sequence number is initialized */
+                    initSeqNo = new AtomicBoolean(true);
+                }
+
+            } catch (NumberFormatException e) {
+                handleException("Invalid xPath parameter - Sequence number specified is not an integer ");
+            } catch (Exception e) {
+                handleException("Failed to initialize starting sequence number at startup: " + e.getMessage());
+            }
+        }
+
+    }
+
+    /**
+     * Get the job details with Name and Job Class
+     *
+     * @return jobDetail - created JobDetail object with Name and JobClass
+     */
+    @Override
+    protected JobDetail getJobDetail() {
+        JobDetail jobDetail = new JobDetail();
+        jobDetail.setName(name + "-resequensing-job");
+        jobDetail.setJobClass(ResequencingJob.class);
+        return jobDetail;
+    }
+
+    /**
+     * Get the map that contains parameters related  to Resequencing job
+     *
+     * @return jobDataMap - created Job Data Map along with the processor instance
+     */
+    @Override
+    protected JobDataMap getJobDataMap() {
+        JobDataMap jdm = new JobDataMap();
+        jdm.put(PROCESSOR_INSTANCE, this);
+        return jdm;
+    }
+
+    /**
+     * Destroy the processor's resequencing job
+     */
+    @Override
+    public void destroy() {
+        try {
+            scheduler.deleteJob(name + "-resequensing-job",
+                    ScheduledMessageProcessor.SCHEDULED_MESSAGE_PROCESSOR_GROUP);
+        } catch (SchedulerException e) {
+        }
+    }
+
+    /**
+     * Activate the Resequencing processor
+     * Set the active value to true
+     */
+    public void activate() {
+        active.set(true);
+    }
+
+    /**
+     * Check if the processor is active or not
+     *
+     * @return active - boolean expression that tells the status of the processor
+     */
+    public boolean isActive() {
+        return active.get();
+    }
+
+    /**
+     * De-activate the resequencing processor
+     * Set the active value to false
+     */
+    public void deactivate() {
+        active.set(false);
+    }
+
+    /**
+     * Returns the next sequence number
+     *
+     * @return nextSeqNo - The sequence number of the message that to be send next
+     */
+    public synchronized int getNextSeqNo() {
+        return nextSeqNo.get();
+    }
+
+    /**
+     * This method allow to change the value of nextSeqNo variable, which is used to determine
+     * the sequence number of the message next to go
+     *
+     * @param value - The value to set
+     */
+    public synchronized void setNextSeqNo(int value) {
+        nextSeqNo.set(value);
+    }
+
+    /**
+     * Increase the sequence number by one
+     */
+    public synchronized void incrementNextSeqNo() {
+        nextSeqNo.incrementAndGet();
+    }
+
+    /**
+     * Indicate whether the initial sequencer number is set
+     *
+     * @return initSeqNo - boolean value containing true or false
+     */
+    public AtomicBoolean isInitSeqNo() {
+        return initSeqNo;
+    }
+
+    /**
+     * Set or clear the initSeqNo value
+     *
+     * @param initSeqNo - boolean value to set
+     */
+    public void setInitSeqNo(AtomicBoolean initSeqNo) {
+        this.initSeqNo = initSeqNo;
+    }
+
+    /**
+     * Get the number initial messages required before set starting sequence number
+     *
+     * @return requiredInitMsgs - int value of required messages
+     */
+    public AtomicInteger getRequiredInitMsgs() {
+        return requiredInitMsgs;
+    }
+
+    /**
+     * Set the number of messages required before set starting sequence number
+     *
+     * @param requiredInitMsgs - number of messages need to wait
+     */
+    public void setRequiredInitMsgs(AtomicInteger requiredInitMsgs) {
+        this.requiredInitMsgs = requiredInitMsgs;
+    }
+
+    /**
+     * Delay until requiredInitMsgs get set
+     *
+     * @return requiredInitMessagesDelay delay value
+     */
+    public AtomicInteger getRequiredInitMsgsDelay() {
+        return requiredInitMsgsDelay;
+    }
+
+    /**
+     * Set the delay until requiredInitMsgs get set
+     *
+     * @param requiredInitMsgsDelay - value for delay
+     */
+    public void setRequiredInitMsgsDelay(AtomicInteger requiredInitMsgsDelay) {
+        this.requiredInitMsgsDelay = requiredInitMsgsDelay;
+    }
+
+    /**
+     * Check whether to delete duplicate messages or not
+     * @return   value of deleteDuplicates
+     */
+    public boolean getDeleteDuplicates() {
+        return deleteDuplicates.get();
+    }
+
+    /**
+     * Set to delete duplicate messages
+     * @param deleteDuplicates
+     */
+    public void setDeleteDuplicates(AtomicBoolean deleteDuplicates) {
+        this.deleteDuplicates = deleteDuplicates;
+    }
+
+
+    /**
+     * Handling errors are done here.
+     * This will log the error messages and throws SynapseException
+     *
+     * @param msg - Error message to be set
+     * @throws org.apache.synapse.SynapseException
+     *          - Exception related to Synapse at Runtime
+     */
+    private static void handleException(String msg) {
+        log.error(msg);
+        throw new SynapseException(msg);
+    }
+
+}
\ No newline at end of file

Modified: synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageProcessorSerializationTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageProcessorSerializationTest.java?rev=1487592&r1=1487591&r2=1487592&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageProcessorSerializationTest.java (original)
+++ synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageProcessorSerializationTest.java Wed May 29 18:55:01 2013
@@ -73,6 +73,28 @@ public class MessageProcessorSerializati
     }
 
     /**
+     * Test the Message Processor Creation and Serialization
+     * For a Basic Message processor with expressions.
+     */
+    public void testMesssageProcessorSerializationWithExpressions() {
+        String messageProcessorConfig = "<syn:messageProcessor xmlns:syn=\"" +
+                                        "http://ws.apache.org/ns/synapse\"" +
+                                        " name=\"foo\" " +
+                                        "class=\"org.apache.synapse.config.xml.MessageProcessorSerializationTest$TestMessageProcessor\" messageStore=\"bar\">" +
+                                        "<syn:parameter name=\"testName0\" xmlns:ns1=\"http://namespace1.synapse.org\" expression=\"//ns1:section/ns1:subSection\"/>"+
+                                        "<syn:parameter name=\"testName1\">testValue1</syn:parameter>" +
+                                        "<syn:parameter name=\"testName2\">testValue2</syn:parameter>" +
+                                        "</syn:messageProcessor>";
+
+        OMElement messageProcessorElement = createOMElement(messageProcessorConfig);
+        MessageProcessor messageProcessor = MessageProcessorFactory.createMessageProcessor(messageProcessorElement);
+        OMElement serializedElement = MessageProcessorSerializer.serializeMessageProcessor(null,
+                                                                                           messageProcessor);
+
+        assertTrue(compare(messageProcessorElement, serializedElement));
+    }
+
+    /**
      * This is a Test Message Processor implementation used only to test the XML Serialization
      */
     @SuppressWarnings("unused")