You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ch...@apache.org on 2013/05/29 20:55:02 UTC
svn commit: r1487592 - in /synapse/trunk/java/modules/core/src:
main/java/org/apache/synapse/config/xml/
main/java/org/apache/synapse/message/processors/resequence/
test/java/org/apache/synapse/config/xml/
Author: charith
Date: Wed May 29 18:55:01 2013
New Revision: 1487592
URL: http://svn.apache.org/r1487592
Log:
Applying patch for SYNAPSE-893 .Thanks Buddhima Wijeweera for contribution.
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingJob.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingProcessor.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java
synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageProcessorSerializationTest.java
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java?rev=1487592&r1=1487591&r2=1487592&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java Wed May 29 18:55:01 2013
@@ -25,6 +25,8 @@ import org.apache.commons.logging.LogFac
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.message.processors.MessageProcessor;
+import org.apache.synapse.util.xpath.SynapseXPath;
+import org.jaxen.JaxenException;
import javax.xml.namespace.QName;
import java.util.HashMap;
@@ -47,24 +49,26 @@ public class MessageProcessorFactory {
private static final Log log = LogFactory.getLog(MessageProcessorFactory.class);
public static final QName CLASS_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "class");
public static final QName NAME_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "name");
+ public static final QName EXPRESSION_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "expression");
public static final QName PARAMETER_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE,
- "parameter");
- public static final QName MESSAGE_STORE_Q = new QName(XMLConfigConstants.NULL_NAMESPACE ,
- "messageStore");
+ "parameter");
+ public static final QName MESSAGE_STORE_Q = new QName(XMLConfigConstants.NULL_NAMESPACE,
+ "messageStore");
private static final QName DESCRIPTION_Q
= new QName(SynapseConstants.SYNAPSE_NAMESPACE, "description");
/**
* Creates a Message processor instance from given xml configuration element
+ *
* @param elem OMElement of that contain the Message processor configuration
- * @return created message processor instance
+ * @return created message processor instance
*/
public static MessageProcessor createMessageProcessor(OMElement elem) {
MessageProcessor processor = null;
OMAttribute clssAtt = elem.getAttribute(CLASS_Q);
- if(clssAtt != null) {
+ if (clssAtt != null) {
try {
Class cls = Class.forName(clssAtt.getAttributeValue());
processor = (MessageProcessor) cls.newInstance();
@@ -86,7 +90,7 @@ public class MessageProcessorFactory {
OMAttribute storeAtt = elem.getAttribute(MESSAGE_STORE_Q);
- if(storeAtt != null) {
+ if (storeAtt != null) {
assert processor != null;
processor.setMessageStoreName(storeAtt.getAttributeValue());
} else {
@@ -115,9 +119,19 @@ public class MessageProcessorFactory {
OMElement prop = (OMElement) o;
OMAttribute paramName = prop.getAttribute(NAME_Q);
String paramValue = prop.getText();
+ OMAttribute paramExpression = prop.getAttribute(EXPRESSION_Q);
if (paramName != null) {
- if (paramValue != null) {
+ if (paramExpression != null) {
+ try {
+ SynapseXPath expression = SynapseXPathFactory.getSynapseXPath(prop, paramExpression.getAttributeValue());
+ parameters.put(paramName.getAttributeValue(), expression);
+ } catch (JaxenException e) {
+ handleException("Error while creating expression " + e.getMessage());
+ }
+ } else if (paramValue != null) {
parameters.put(paramName.getAttributeValue(), paramValue);
+ } else {
+ handleException("Invalid MessageStore parameter - Parameter must have a value or an expression ");
}
} else {
handleException("Invalid MessageStore parameter - Parameter must have a name ");
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java?rev=1487592&r1=1487591&r2=1487592&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java Wed May 29 18:55:01 2013
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFac
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.message.processors.MessageProcessor;
+import org.apache.synapse.util.xpath.SynapseXPath;
import javax.xml.namespace.QName;
import java.util.Iterator;
@@ -39,6 +40,7 @@ import java.util.Iterator;
* <parameter name="string">"string" <parameter>
* <parameter name="string">"string" <parameter>
* <parameter name="string">"string" <parameter>
+ * <parameter name="string" expression="string">
* .
* .
* </messageProcessor>
@@ -55,15 +57,17 @@ public class MessageProcessorSerializer
/**
* Serialize a give Message processor instance to XML configuration
- * @param parent parent configuration
+ *
+ * @param parent parent configuration
* @param processor message processor instance
- * @return created XML configuration
+ * @return created XML configuration
*/
- public static OMElement serializeMessageProcessor(OMElement parent, MessageProcessor processor) {
+ public static OMElement serializeMessageProcessor(OMElement parent,
+ MessageProcessor processor) {
OMElement processorElem = fac.createOMElement("messageProcessor", synNS);
if (processor != null) {
processorElem.addAttribute(fac.createOMAttribute("class", nullNS,
- processor.getClass().getName()));
+ processor.getClass().getName()));
} else {
handleException("Invalid processor. Provider is required");
}
@@ -74,21 +78,32 @@ public class MessageProcessorSerializer
handleException("Message store Name not specified");
}
- if(processor.getMessageStoreName() != null) {
+ if (processor.getMessageStoreName() != null) {
processorElem.addAttribute(fac.createOMAttribute(
- "messageStore",nullNS,processor.getMessageStoreName()));
+ "messageStore", nullNS, processor.getMessageStoreName()));
}
if (processor.getParameters() != null) {
Iterator iterator = processor.getParameters().keySet().iterator();
while (iterator.hasNext()) {
String name = (String) iterator.next();
- String value = (String) processor.getParameters().get(name);
+ Object object = processor.getParameters().get(name);
OMElement property = fac.createOMElement("parameter", synNS);
property.addAttribute(fac.createOMAttribute(
"name", nullNS, name));
- property.setText(value.trim());
+
+ if (object instanceof String) {
+ String value = (String) object;
+ property.setText(value.trim());
+
+ } else if (object instanceof SynapseXPath) {
+ SynapseXPath expression = (SynapseXPath) object;
+ SynapseXPathSerializer.serializeXPath(expression,property,"expression");
+
+ }
+
processorElem.addChild(property);
+
}
}
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingJob.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingJob.java?rev=1487592&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingJob.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingJob.java Wed May 29 18:55:01 2013
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.message.processors.resequence;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.message.processors.MessageProcessorConsents;
+import org.apache.synapse.message.processors.ScheduledMessageProcessor;
+import org.apache.synapse.message.store.MessageStore;
+import org.apache.synapse.util.xpath.SynapseXPath;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * All necessary logic for Resequencing is implemented with in this class.
+ * This class extends from Job class which comes from Quartz
+ */
+public class ResequencingJob implements Job {
+
+ /**
+ * Log is set to the current class
+ */
+ private static final Log log = LogFactory.getLog(ResequencingJob.class);
+
+ /**
+ * This method will takes the necessary parameters from parameter list and do the resequencing
+ * Resequencing is done through reading messages until the next-to-send message is found
+ * If required is not found then waits until the next instance is created.
+ *
+ * @param jobExecutionContext - a bundle with information related to environment
+ * @throws JobExecutionException - to indicate Quartz scheduler that an error occurred while executing the job
+ */
+ public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
+
+ final JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
+ final MessageStore messageStore = (MessageStore) jdm.get(MessageProcessorConsents.MESSAGE_STORE);
+ final ResequencingProcessor processor = (ResequencingProcessor) jdm.get(
+ ScheduledMessageProcessor.PROCESSOR_INSTANCE);
+
+ final Map<String, Object> parameters = (Map<String, Object>) jdm.get(MessageProcessorConsents.PARAMETERS);
+ final String sequence = (String) parameters.get(ResequencingProcessor.NEXT_SEQUENCE);
+
+ SynapseXPath seqNoxPath = null;
+
+ /** Checking for activation of processor or existence of message store */
+ if (!processor.isActive() || messageStore == null) {
+ return;
+ }
+
+ /** Extract the SynapseXpath object from parameters to identify the sequence number of the message */
+ if (parameters != null && parameters.get(ResequencingProcessor.SEQUENCE_NUMBER_XPATH) != null) {
+ seqNoxPath = (SynapseXPath) parameters.get(ResequencingProcessor.SEQUENCE_NUMBER_XPATH);
+ }
+
+ /** Extract the number of messages interested to come */
+ if (parameters != null && parameters.get(ResequencingProcessor.REQ_INIT_MSGS) != null) {
+ processor.setRequiredInitMsgs(new AtomicInteger(Integer.parseInt((String) parameters.get(
+ ResequencingProcessor.REQ_INIT_MSGS))));
+ }
+ /** Extract the delay wait until the interested messages come */
+ if (parameters != null && parameters.get(ResequencingProcessor.REQ_INIT_MSGS_DELAY) != null) {
+ processor.setRequiredInitMsgsDelay(new AtomicInteger(Integer.parseInt((String) parameters.get(
+ ResequencingProcessor.REQ_INIT_MSGS_DELAY))));
+ }
+
+ /** Extract whether to delete duplicate messages */
+ if (parameters != null && parameters.get(ResequencingProcessor.DELETE_DUPLICATES) != null) {
+ String result=(String) parameters.get(ResequencingProcessor.DELETE_DUPLICATES);
+ if(result.equalsIgnoreCase("TRUE")){
+ processor.setDeleteDuplicates(new AtomicBoolean(true));
+ }
+ }
+
+
+ if (!processor.isInitSeqNo().get()) {
+ /** Deactivating Resequencing processor to avoid executing multiple Job instances */
+ processor.deactivate();
+
+ do {
+ delay(ResequencingProcessor.STARTING_NUMBER_INIT_DELAY);
+
+ if (messageStore.size() >= processor.getRequiredInitMsgs().get()) {
+ selectStartingSeqNo(processor, messageStore, seqNoxPath);
+ break;
+ }
+ processor.tried++;
+ } while (processor.tried < processor.getRequiredInitMsgsDelay().get());
+
+ }
+
+ if (!processor.isInitSeqNo().get()) {
+
+ while (true) {
+
+ if (messageStore.size() > 0) {
+ selectStartingSeqNo(processor, messageStore, seqNoxPath);
+
+ if (!processor.isInitSeqNo().get()) {
+ log.warn("Resequencer failed to select starting sequence number with in given timeout !");
+ }
+
+ break;
+ }
+
+ }
+
+ }
+
+ /** Continue to this section happens only after initializing the starting sequence number */
+ boolean errorStop = false;
+ while (!errorStop) {
+
+ /** Iterate through message store */
+ for (int messageIndex = 0; ; messageIndex++) {
+ MessageContext messageContext = messageStore.get(messageIndex);
+
+ if (messageContext == null) {
+ errorStop = true;
+ break;
+ }
+
+ /** Extract the sequence number from the message */
+ int sequenceNo;
+ try {
+ sequenceNo = Integer.parseInt(seqNoxPath.stringValueOf(messageContext));
+ } catch (Exception e) {
+ log.warn("Can't Find sequence number from message " + e.getMessage());
+ continue;
+ }
+
+ String messageId = messageContext.getMessageID();
+
+ /** Remove messages which have less sequence number than required */
+ if(sequenceNo<processor.getNextSeqNo() && processor.getDeleteDuplicates()){
+ messageStore.remove(messageId);
+ }
+
+ /** Compare the next-to-go sequence number with current message sequence number */
+ if (sequenceNo == processor.getNextSeqNo()) {
+
+ /** Remove selected message from store */
+
+ messageStore.remove(messageId);
+ /** If sending does not failed increase sequence number */
+ if (send(messageContext, sequence)) {
+
+ processor.incrementNextSeqNo();
+ }
+ /** Break and start searching from beginning */
+ break;
+ }
+
+
+ }
+
+ }
+
+ /** Reactivating Processor after selecting initial sequence number */
+ if (!processor.isActive()) {
+ processor.activate();
+ }
+
+ }
+
+ /**
+ * Selects the smallest sequence number as the starting sequence number from a given message store
+ *
+ * @param processor - Resequencing processor which is interested to know starting sequence number
+ * @param messageStore - Message store that contains messages
+ * @param seqNoxPath - SynapseXpath object which contains the xpath to find the sequence number from a message
+ */
+ private void selectStartingSeqNo(ResequencingProcessor processor, MessageStore messageStore,
+ SynapseXPath seqNoxPath) {
+ /** Iterate through message store */
+ for (int messageIndex = 0; ; messageIndex++) {
+ try {
+ MessageContext messageContext = messageStore.get(messageIndex);
+ if (messageContext == null) {
+ break;
+ }
+ /** Extract the sequence number from the message */
+ int sequenceNo;
+
+ sequenceNo = Integer.parseInt(seqNoxPath.stringValueOf(messageContext));
+
+
+ /** If the sequence number is smaller that current next-sequence number, current next-sequence number get replaced */
+ if (sequenceNo < processor.getNextSeqNo()) {
+ processor.setNextSeqNo(sequenceNo);
+ processor.setInitSeqNo(new AtomicBoolean(true));
+ }
+
+
+ } catch (NumberFormatException e) {
+ handleException("Invalid xPath parameter - Sequence number specified is not an integer ");
+ } catch (Exception e) {
+ handleException("Failed to initialize starting sequence number at startup: " + e.getMessage());
+ }
+ }
+
+
+ }
+
+ /**
+ * To timePeriod the processor until next checking up
+ * This method is in use wen initializing the starting sequence number of the resequencer
+ *
+ * @param timePeriod - the time period which waits before a single cycle
+ */
+ private void delay(long timePeriod) {
+ try {
+ Thread.sleep(timePeriod);
+ } catch (InterruptedException e) {
+ log.error(new String("Interrupted while thread sleeping in resequencer"));
+ }
+ }
+
+ /**
+ * Transmit the message in to a given sequence
+ * This method will takes the sequence given in sequence parameter. If no sequence is given this will return false *
+ *
+ * @param messageContext - the content of the message that is transferred by Resequencer from message store
+ * @param sequence - the sequence name that the message should be passed
+ * @return boolean - to indicate the success of transferring the message
+ */
+ private boolean send(MessageContext messageContext, String sequence) {
+
+ Mediator processingSequence = messageContext.getSequence(sequence);
+ if (processingSequence != null) {
+ processingSequence.mediate(messageContext);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Handling errors are done here.
+ * This will log the error messages and throws SynapseException
+ *
+ * @param msg - Error message to be set
+ * @throws SynapseException - Exception related to Synapse at Runtime
+ */
+ private static void handleException(String msg) {
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+
+}
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingProcessor.java?rev=1487592&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingProcessor.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/resequence/ResequencingProcessor.java Wed May 29 18:55:01 2013
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.message.processors.resequence;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.message.processors.ScheduledMessageProcessor;
+import org.apache.synapse.message.store.MessageStore;
+import org.apache.synapse.util.xpath.SynapseXPath;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.SchedulerException;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class will be used as the processor of the resequencer and set up
+ * the necessary environment for the ReequencingJob.
+ * This should be run periodically after given time interval and
+ * for that this should be inherited from ScheduledMessageProcessor class
+ */
+public class ResequencingProcessor extends ScheduledMessageProcessor {
+
+ /**
+ * Log is set to the current class
+ */
+ private static final Log log = LogFactory.getLog(ResequencingProcessor.class);
+
+ /**
+ * State of the processor
+ */
+ private AtomicBoolean active = new AtomicBoolean(true);
+
+ /**
+ * To indicate whether the starting sequence number is set while initializing the processor
+ */
+ private AtomicBoolean initSeqNo = new AtomicBoolean(false);
+
+ /**
+ * Sequence number of the message that should be send next
+ */
+ private AtomicInteger nextSeqNo = new AtomicInteger(Integer.MAX_VALUE);
+
+ /**
+ * Number of messages interested to come
+ * Number of messages that the Resequencing processor should wait for before selecting the starting sequence number.
+ * Default value is 4
+ */
+ private AtomicInteger requiredInitMsgs = new AtomicInteger(4);
+
+ /**
+ * Time to wait for interested number of messages to come
+ */
+ private AtomicInteger requiredInitMsgsDelay = new AtomicInteger(5);
+
+ /**
+ * xpath expression to extract the sequence number
+ */
+ public static final String SEQUENCE_NUMBER_XPATH = "seqNumXpath";
+
+ /**
+ * Sequence that the messages should be passed to
+ */
+ public static final String NEXT_SEQUENCE = "nextEsbSequence";
+
+ /**
+ * Required initial number of messages
+ */
+ public static final String REQ_INIT_MSGS = "requiredInitMessages";
+
+ /**
+ * Delay until getting required number of messages receive
+ */
+ public static final String REQ_INIT_MSGS_DELAY = "requiredInitMessagesDelay";
+
+ /**
+ * Delay time period required for Resequencer processor while initializing starting sequence number
+ */
+ public static final int STARTING_NUMBER_INIT_DELAY = 6000;
+
+ /**
+ * Number of times currently processor waited until required number of messages come
+ * Max value is determined by requiredInitMsgs variable value
+ */
+ public int tried = 0;
+
+ public static final String DELETE_DUPLICATES="deleteDuplicateMessages";
+
+ private AtomicBoolean deleteDuplicates=new AtomicBoolean(false);
+
+
+ /**
+ * Initiate the processor with SynapseEnvironment
+ *
+ * @param se - SynapseEnvironment to be set
+ */
+ @Override
+ public void init(SynapseEnvironment se) {
+ super.init(se);
+
+ /** Set the initial sequence number */
+ findFirstSeqNum();
+
+ }
+
+
+ /**
+ * This method use to find the minimum sequence number in the message store at the startup
+ */
+ private void findFirstSeqNum() {
+
+
+ MessageStore store = configuration.getMessageStore(messageStore);
+ SynapseXPath seqNoxPath = null;
+
+ /** Extract the SynapseXpath configuration.getMessageStore(messageStore)object from parameters to
+ * identify the sequence number of the message */
+ if (parameters != null && parameters.get(ResequencingProcessor.SEQUENCE_NUMBER_XPATH) != null) {
+ seqNoxPath = (SynapseXPath) parameters.get(ResequencingProcessor.SEQUENCE_NUMBER_XPATH);
+ }
+
+
+ /** Iterate through message store */
+ for (int messageIndex = 0; ; messageIndex++) {
+
+ try {
+ MessageContext messageContext = store.get(messageIndex);
+ if (messageContext == null) {
+ break;
+ }
+
+ /** Extract the sequence number from the message */
+ int sequenceNo = Integer.parseInt(seqNoxPath.stringValueOf(messageContext));
+
+ /** If the sequence number is smaller that current next-sequence number, current next-sequence
+ * number get replaced */
+ if (sequenceNo < getNextSeqNo()) {
+ setNextSeqNo(sequenceNo);
+ /** To indicate that starting sequence number is initialized */
+ initSeqNo = new AtomicBoolean(true);
+ }
+
+ } catch (NumberFormatException e) {
+ handleException("Invalid xPath parameter - Sequence number specified is not an integer ");
+ } catch (Exception e) {
+ handleException("Failed to initialize starting sequence number at startup: " + e.getMessage());
+ }
+ }
+
+ }
+
+ /**
+ * Get the job details with Name and Job Class
+ *
+ * @return jobDetail - created JobDetail object with Name and JobClass
+ */
+ @Override
+ protected JobDetail getJobDetail() {
+ JobDetail jobDetail = new JobDetail();
+ jobDetail.setName(name + "-resequensing-job");
+ jobDetail.setJobClass(ResequencingJob.class);
+ return jobDetail;
+ }
+
+ /**
+ * Get the map that contains parameters related to Resequencing job
+ *
+ * @return jobDataMap - created Job Data Map along with the processor instance
+ */
+ @Override
+ protected JobDataMap getJobDataMap() {
+ JobDataMap jdm = new JobDataMap();
+ jdm.put(PROCESSOR_INSTANCE, this);
+ return jdm;
+ }
+
+ /**
+ * Destroy the processor's resequencing job
+ */
+ @Override
+ public void destroy() {
+ try {
+ scheduler.deleteJob(name + "-resequensing-job",
+ ScheduledMessageProcessor.SCHEDULED_MESSAGE_PROCESSOR_GROUP);
+ } catch (SchedulerException e) {
+ }
+ }
+
+ /**
+ * Activate the Resequencing processor
+ * Set the active value to true
+ */
+ public void activate() {
+ active.set(true);
+ }
+
+ /**
+ * Check if the processor is active or not
+ *
+ * @return active - boolean expression that tells the status of the processor
+ */
+ public boolean isActive() {
+ return active.get();
+ }
+
+ /**
+ * De-activate the resequencing processor
+ * Set the active value to false
+ */
+ public void deactivate() {
+ active.set(false);
+ }
+
+ /**
+ * Returns the next sequence number
+ *
+ * @return nextSeqNo - The sequence number of the message that to be send next
+ */
+ public synchronized int getNextSeqNo() {
+ return nextSeqNo.get();
+ }
+
+ /**
+ * This method allow to change the value of nextSeqNo variable, which is used to determine
+ * the sequence number of the message next to go
+ *
+ * @param value - The value to set
+ */
+ public synchronized void setNextSeqNo(int value) {
+ nextSeqNo.set(value);
+ }
+
+ /**
+ * Increase the sequence number by one
+ */
+ public synchronized void incrementNextSeqNo() {
+ nextSeqNo.incrementAndGet();
+ }
+
+ /**
+ * Indicate whether the initial sequencer number is set
+ *
+ * @return initSeqNo - boolean value containing true or false
+ */
+ public AtomicBoolean isInitSeqNo() {
+ return initSeqNo;
+ }
+
+ /**
+ * Set or clear the initSeqNo value
+ *
+ * @param initSeqNo - boolean value to set
+ */
+ public void setInitSeqNo(AtomicBoolean initSeqNo) {
+ this.initSeqNo = initSeqNo;
+ }
+
+ /**
+ * Get the number initial messages required before set starting sequence number
+ *
+ * @return requiredInitMsgs - int value of required messages
+ */
+ public AtomicInteger getRequiredInitMsgs() {
+ return requiredInitMsgs;
+ }
+
+ /**
+ * Set the number of messages required before set starting sequence number
+ *
+ * @param requiredInitMsgs - number of messages need to wait
+ */
+ public void setRequiredInitMsgs(AtomicInteger requiredInitMsgs) {
+ this.requiredInitMsgs = requiredInitMsgs;
+ }
+
+ /**
+ * Delay until requiredInitMsgs get set
+ *
+ * @return requiredInitMessagesDelay delay value
+ */
+ public AtomicInteger getRequiredInitMsgsDelay() {
+ return requiredInitMsgsDelay;
+ }
+
+ /**
+ * Set the delay until requiredInitMsgs get set
+ *
+ * @param requiredInitMsgsDelay - value for delay
+ */
+ public void setRequiredInitMsgsDelay(AtomicInteger requiredInitMsgsDelay) {
+ this.requiredInitMsgsDelay = requiredInitMsgsDelay;
+ }
+
+ /**
+ * Check whether to delete duplicate messages or not
+ * @return value of deleteDuplicates
+ */
+ public boolean getDeleteDuplicates() {
+ return deleteDuplicates.get();
+ }
+
+ /**
+ * Set to delete duplicate messages
+ * @param deleteDuplicates
+ */
+ public void setDeleteDuplicates(AtomicBoolean deleteDuplicates) {
+ this.deleteDuplicates = deleteDuplicates;
+ }
+
+
+ /**
+ * Handling errors are done here.
+ * This will log the error messages and throws SynapseException
+ *
+ * @param msg - Error message to be set
+ * @throws org.apache.synapse.SynapseException
+ * - Exception related to Synapse at Runtime
+ */
+ private static void handleException(String msg) {
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+
+}
\ No newline at end of file
Modified: synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageProcessorSerializationTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageProcessorSerializationTest.java?rev=1487592&r1=1487591&r2=1487592&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageProcessorSerializationTest.java (original)
+++ synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageProcessorSerializationTest.java Wed May 29 18:55:01 2013
@@ -73,6 +73,28 @@ public class MessageProcessorSerializati
}
/**
+ * Test the Message Processor Creation and Serialization
+ * For a Basic Message processor with expressions.
+ */
+ public void testMesssageProcessorSerializationWithExpressions() {
+ String messageProcessorConfig = "<syn:messageProcessor xmlns:syn=\"" +
+ "http://ws.apache.org/ns/synapse\"" +
+ " name=\"foo\" " +
+ "class=\"org.apache.synapse.config.xml.MessageProcessorSerializationTest$TestMessageProcessor\" messageStore=\"bar\">" +
+ "<syn:parameter name=\"testName0\" xmlns:ns1=\"http://namespace1.synapse.org\" expression=\"//ns1:section/ns1:subSection\"/>"+
+ "<syn:parameter name=\"testName1\">testValue1</syn:parameter>" +
+ "<syn:parameter name=\"testName2\">testValue2</syn:parameter>" +
+ "</syn:messageProcessor>";
+
+ OMElement messageProcessorElement = createOMElement(messageProcessorConfig);
+ MessageProcessor messageProcessor = MessageProcessorFactory.createMessageProcessor(messageProcessorElement);
+ OMElement serializedElement = MessageProcessorSerializer.serializeMessageProcessor(null,
+ messageProcessor);
+
+ assertTrue(compare(messageProcessorElement, serializedElement));
+ }
+
+ /**
* This is a Test Message Processor implementation used only to test the XML Serialization
*/
@SuppressWarnings("unused")