You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ch...@apache.org on 2011/02/11 07:16:59 UTC
svn commit: r1069682 - in /synapse/trunk/java/modules/core/src:
main/java/org/apache/synapse/ main/java/org/apache/synapse/config/xml/
main/java/org/apache/synapse/endpoints/
main/java/org/apache/synapse/mediators/store/
main/java/org/apache/synapse/me...
Author: charith
Date: Fri Feb 11 06:16:58 2011
New Revision: 1069682
URL: http://svn.apache.org/viewvc?rev=1069682&view=rev
Log:
refactoring synaspe message store
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreMediatorFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelViewMBean.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
Removed:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/RedeliveryProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/StorableMessage.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreView.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreViewMBean.java
synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageStoreSerializationTest.java
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java Fri Feb 11 06:16:58 2011
@@ -402,13 +402,6 @@ public final class SynapseConstants {
public static final int CALLOUT_OPERATION_FAILED = 401000;
- /**Message Context properties used for Message Store*/
-
- // Number of attempts that has been tried to redeliver the message
- public static final String MESSAGE_STORE_REDELIVERY_COUNT = "message.store.redelivery.count";
-
- // Does message as attemted all its redelivered
- public static final String MESSAGE_STORE_REDELIVERED = "message.store.redelivery.redelivered";
// Fail-safe mode properties
public static final String FAIL_SAFE_MODE_STATUS = "failsafe.mode.enable";
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java Fri Feb 11 06:16:58 2011
@@ -79,7 +79,8 @@ public class MediatorFactoryFinder imple
ConditionalRouterMediatorFactory.class,
SamplingThrottleMediatorFactory.class,
URLRewriteMediatorFactory.class,
- EnrichMediatorFactory.class
+ EnrichMediatorFactory.class,
+ MessageStoreMediatorFactory.class
};
private final static MediatorFactoryFinder instance = new MediatorFactoryFinder();
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java Fri Feb 11 06:16:58 2011
@@ -25,12 +25,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseException;
import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.message.processors.MessageProcessor;
import org.apache.synapse.message.store.InMemoryMessageStore;
import org.apache.synapse.message.store.MessageStore;
-import org.apache.synapse.message.store.RedeliveryProcessor;
import org.apache.axis2.util.JavaUtils;
+import javax.xml.XMLConstants;
import javax.xml.namespace.QName;
import java.util.Iterator;
import java.util.Map;
@@ -40,11 +41,14 @@ import java.util.Properties;
/**
* Create an instance of the given Message Store, and sets properties on it.
* <p/>
- * <messageStore name="string" class="classname" [sequence = "string" ] >
- * <redelivery>
- * <interval>delay in seconds </interval>
- * <maximumRedeliveries>maximum_number_of_redeliveries_to attempt </maximumRedeliveries>
- * </redelivery>
+ * <messageStore name="string" class="classname" [sequence = "string" ]>
+ * <<processor class="classname">>
+ * <</processor>>
+ * <parameter name="string">"string" <parameter>
+ * <parameter name="string">"string" <parameter>
+ * <parameter name="string">"string" <parameter>
+ * .
+ * .
* </messageStore>
*/
public class MessageStoreFactory {
@@ -55,20 +59,13 @@ public class MessageStoreFactory {
public static final QName NAME_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "name");
public static final QName SEQUENCE_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "sequence");
- private static final QName REDELIVERY_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE,
- "redelivery");
- private static final QName DELAY_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "interval");
- private static final QName MAX_REDELIVERIES = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE,
- "maximumRedeliveries");
- private static final QName ENABLE_EXPONENTIAL_BACKOFF = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE,
- "exponentialBackoff");
- private static final QName BACKOFF_MULTIPLIER = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE,
- "backoffMutiplier");
+ public static final QName PROCESSOR_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "processor");
public static final QName PARAMETER_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE,
"parameter");
private static final QName DESCRIPTION_Q
= new QName(SynapseConstants.SYNAPSE_NAMESPACE, "description");
+
@SuppressWarnings({"UnusedDeclaration"})
public static MessageStore createMessageStore(OMElement elem, Properties properties) {
@@ -86,7 +83,9 @@ public class MessageStoreFactory {
messageStore = new InMemoryMessageStore();
}
+
OMAttribute nameAtt = elem.getAttribute(NAME_Q);
+
if (nameAtt != null) {
messageStore.setName(nameAtt.getAttributeValue());
} else {
@@ -94,62 +93,35 @@ public class MessageStoreFactory {
}
OMAttribute sequenceAtt = elem.getAttribute(SEQUENCE_Q);
- if(sequenceAtt != null) {
+ if (sequenceAtt != null) {
messageStore.setSequence(sequenceAtt.getAttributeValue());
}
- OMElement redeliveryElem = elem.getFirstChildWithName(REDELIVERY_Q);
-
- if (redeliveryElem != null) {
- RedeliveryProcessor redeliveryProcessor = populateRedeliveryProcessor(redeliveryElem,
- messageStore);
- messageStore.setRedeliveryProcessor(redeliveryProcessor);
- }
-
OMElement descriptionElem = elem.getFirstChildWithName(DESCRIPTION_Q);
if (descriptionElem != null) {
messageStore.setDescription(descriptionElem.getText());
}
messageStore.setParameters(getParameters(elem));
- return messageStore;
- }
- private static RedeliveryProcessor populateRedeliveryProcessor(OMElement element,
- MessageStore messageStore) {
-
- RedeliveryProcessor redeliveryProcessor = new RedeliveryProcessor(messageStore);
-
- OMElement intervalElm = element.getFirstChildWithName(DELAY_Q);
- if (intervalElm != null) {
- int delay = 1000 * Integer.parseInt(intervalElm.getText());
- redeliveryProcessor.setRedeliveryDelay(delay);
- }
-
- OMElement maxRedeliveryElm = element.getFirstChildWithName(MAX_REDELIVERIES);
-
- if (maxRedeliveryElm != null) {
- int maxRedeliveries = Integer.parseInt(maxRedeliveryElm.getText());
- redeliveryProcessor.setMaxRedeleveries(maxRedeliveries);
+ OMElement processorElm = elem.getFirstChildWithName(PROCESSOR_Q);
+ MessageProcessor processor = null;
+ if (processorElm != null) {
+ processor = populateMessageProcessor(processorElm);
+ } else {
+ log.warn("Creating a Message Store without ");
}
- OMElement expBOElm = element.getFirstChildWithName(ENABLE_EXPONENTIAL_BACKOFF);
-
- if (expBOElm != null) {
- if (JavaUtils.isTrueExplicitly(expBOElm.getText())) {
- redeliveryProcessor.setExponentialBackoff(true);
- OMElement multiplierElm = element.getFirstChildWithName(BACKOFF_MULTIPLIER);
- if (multiplierElm != null) {
- int mulp = Integer.parseInt(multiplierElm.getText());
- redeliveryProcessor.setBackOffMultiplier(mulp);
- }
- }
+ if(processor != null) {
+ messageStore.setMessageProcessor(processor);
+ } else {
+ log.warn("Message Store Created with out a Message processor. ");
}
-
- return redeliveryProcessor;
+ return messageStore;
}
- private static Map<String,Object> getParameters(OMElement elem) {
+
+ private static Map<String, Object> getParameters(OMElement elem) {
Iterator params = elem.getChildrenWithName(PARAMETER_Q);
Map<String, Object> parameters = new HashMap<String, Object>();
@@ -168,7 +140,54 @@ public class MessageStoreFactory {
}
}
}
- return parameters ;
+ return parameters;
+ }
+
+ /**
+ * Populate the Message Processor
+ *
+ * @return
+ */
+ private static MessageProcessor populateMessageProcessor(OMElement element) {
+ OMAttribute classAtt = element.getAttribute(CLASS_Q);
+ MessageProcessor processor = null;
+ if (classAtt != null) {
+ String className = classAtt.getAttributeValue();
+ try {
+ Class cls = Class.forName(className);
+ if (cls != null) {
+ processor = (MessageProcessor) cls.newInstance();
+ Iterator params = element.getChildrenWithName(PARAMETER_Q);
+ Map<String, Object> parameters = new HashMap<String, Object>();
+
+ while (params.hasNext()) {
+ Object o = params.next();
+ if (o instanceof OMElement) {
+ OMElement prop = (OMElement) o;
+ OMAttribute paramName = prop.getAttribute(NAME_Q);
+ String paramValue = prop.getText();
+ if (paramName != null) {
+ if (paramValue != null) {
+ parameters.put(paramName.getAttributeValue(), paramValue);
+ }
+ } else {
+ handleException("Invalid Message Processor parameter - Parameter must have a name ");
+ }
+ }
+ }
+ processor.setParameters(parameters);
+ } else {
+ throw new SynapseException("Can't find Class " + className);
+ }
+ } catch (Exception e) {
+ log.error("Error while Creating MessageProcessor " + e.getMessage());
+ throw new SynapseException(e);
+ }
+ } else {
+ log.warn("Creating Message Store with out a Message processor processor");
+ }
+
+ return processor;
}
private static void handleException(String msg) {
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreMediatorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreMediatorFactory.java?rev=1069682&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreMediatorFactory.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreMediatorFactory.java Fri Feb 11 06:16:58 2011
@@ -0,0 +1,72 @@
+/*
+* Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+* WSO2 Inc. licenses this file to you under the Apache License,
+* Version 2.0 (the "License"); you may not use this file except
+* in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.config.xml;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.mediators.store.MessageStoreMediator;
+
+import javax.xml.namespace.QName;
+import java.util.Properties;
+
+/**
+ * Creates an instance of a MessageStore mediator using XML configuration specified
+ *
+ * <pre>
+ * <store messageStore = "message store name" [sequence = "sequence name"] />
+ * </pre>
+ *
+ * TODO Message store mediator will be improved with more user options
+ */
+public class MessageStoreMediatorFactory extends AbstractMediatorFactory{
+
+ private static final QName STORE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "store");
+ private static final QName ATT_MESSAGE_STORE = new QName("messageStore");
+ private static final QName ATT_SEQUENCE = new QName("sequence");
+
+ @Override
+ protected Mediator createSpecificMediator(OMElement elem, Properties properties) {
+ MessageStoreMediator messageStoreMediator = new MessageStoreMediator();
+ OMAttribute nameAtt = elem.getAttribute(ATT_NAME);
+ if(nameAtt != null) {
+ messageStoreMediator.setName(nameAtt.getAttributeValue());
+ }
+
+ OMAttribute messageStoreNameAtt = elem.getAttribute(ATT_MESSAGE_STORE);
+
+ if(messageStoreNameAtt != null) {
+ messageStoreMediator.setMessageStoreName(messageStoreNameAtt.getAttributeValue());
+ } else {
+ throw new SynapseException("Message Store mediator must have a Message store defined");
+ }
+
+ OMAttribute sequenceAtt = elem.getAttribute(ATT_SEQUENCE);
+
+ if(sequenceAtt != null) {
+ messageStoreMediator.setOnStoreSequence(sequenceAtt.getAttributeValue());
+ }
+
+ return messageStoreMediator;
+ }
+
+ public QName getTagQName() {
+ return STORE_Q;
+ }
+}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java Fri Feb 11 06:16:58 2011
@@ -27,20 +27,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
+import org.apache.synapse.message.processors.MessageProcessor;
import org.apache.synapse.message.store.MessageStore;
import org.apache.synapse.message.store.InMemoryMessageStore;
import javax.xml.namespace.QName;
import java.util.Iterator;
+import java.util.Map;
/**
* Serialize an instance of the given Message Store, and sets properties on it.
* <p/>
* <messageStore name="string" class="classname" [sequence = "string" ] >
- * <redelivery>
- * <interval>delay in seconds </interval>
- * <maximumRedeliveries>maximum_number_of_redeliveries_to attempt </maximumRedeliveries>
- * </redelivery>
+ * <parameter name="string">"string" <parameter>
+ * <parameter name="string">"string" <parameter>
+ * <parameter name="string">"string" <parameter>
* </messageStore>
*/
public class MessageStoreSerializer {
@@ -75,36 +76,6 @@ public class MessageStoreSerializer {
handleException("Message store Name not specified");
}
- //Redelivery processor
- OMElement redilevery = fac.createOMElement("redelivery", synNS);
- int reDeliveryDelay = messageStore.getRedeliveryProcessor().getRedeliveryDelay() / 1000;
-
- OMElement delay = fac.createOMElement("interval", synNS);
- delay.setText(String.valueOf(reDeliveryDelay));
-
- redilevery.addChild(delay);
-
- int maxRedeliveries = messageStore.getRedeliveryProcessor().getMaxRedeleveries();
- OMElement maxRedeliveryElm = fac.createOMElement("maximumRedeliveries", synNS);
- maxRedeliveryElm.setText(String.valueOf(maxRedeliveries));
-
- redilevery.addChild(maxRedeliveryElm);
-
- if (messageStore.getRedeliveryProcessor().isExponentialBackoffEnable()) {
- OMElement expBOElm = fac.createOMElement("exponentialBackoff", synNS);
- expBOElm.setText("true");
- redilevery.addChild(expBOElm);
-
- OMElement multiplierElm = fac.createOMElement("backoffMutiplier", synNS);
- int multiplier = messageStore.getRedeliveryProcessor().getBackOffMultiplier();
- if (multiplier > 0) {
- multiplierElm.setText(String.valueOf(multiplier));
- redilevery.addChild(multiplierElm);
- }
- }
-
- store.addChild(redilevery);
-
if (messageStore.getParameters() != null) {
Iterator iter = messageStore.getParameters().keySet().iterator();
while (iter.hasNext()) {
@@ -118,6 +89,9 @@ public class MessageStoreSerializer {
}
}
+ if(messageStore.getMessageProcessor() != null) {
+ populateProcessorElem(store,fac,messageStore);
+ }
if (getSerializedDescription(messageStore) != null) {
store.addChild(getSerializedDescription(messageStore));
}
@@ -140,6 +114,23 @@ public class MessageStoreSerializer {
}
}
+ private static void populateProcessorElem(OMElement storeElem ,OMFactory factory,
+ MessageStore messageStore) {
+ MessageProcessor processor = messageStore.getMessageProcessor();
+ OMElement processorElem = factory.createOMElement("processor" , synNS);
+ Map<String ,Object> parameters = processor.getParameters();
+
+ Iterator<Map.Entry<String ,Object>> it = parameters.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String,Object> entry = it.next();
+ OMElement paramElem = factory.createOMElement("parameter", synNS);
+ paramElem.addAttribute(fac.createOMAttribute("name", nullNS, entry.getKey()));
+ paramElem.setText((String)entry.getValue());
+ processorElem.addChild(paramElem);
+ }
+ storeElem.addChild(processorElem);
+ }
+
private static void handleException(String msg) {
log.error(msg);
throw new SynapseException(msg);
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java Fri Feb 11 06:16:58 2011
@@ -21,22 +21,21 @@ package org.apache.synapse.endpoints;
import org.apache.axis2.clustering.ClusteringAgent;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.transport.base.BaseConstants;
import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.transport.base.BaseConstants;
import org.apache.axis2.util.JavaUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.*;
-import org.apache.synapse.mediators.MediatorFaultHandler;
-import org.apache.synapse.message.store.StorableMessage;
-import org.apache.synapse.mediators.MediatorProperty;
-import org.apache.synapse.aspects.ComponentType;
import org.apache.synapse.aspects.AspectConfiguration;
+import org.apache.synapse.aspects.ComponentType;
import org.apache.synapse.aspects.statistics.StatisticsReporter;
import org.apache.synapse.commons.jmx.MBeanRegistrar;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.core.axis2.Axis2SynapseEnvironment;
+import org.apache.synapse.mediators.MediatorFaultHandler;
+import org.apache.synapse.mediators.MediatorProperty;
import java.util.*;
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java?rev=1069682&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java Fri Feb 11 06:16:58 2011
@@ -0,0 +1,86 @@
+/*
+* Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+* WSO2 Inc. licenses this file to you under the Apache License,
+* Version 2.0 (the "License"); you may not use this file except
+* in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.mediators.store;
+
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.mediators.AbstractMediator;
+import org.apache.synapse.message.store.MessageStore;
+
+/**
+ * <code>MessageStoreMediator</code> will store the incoming Messages in associated MessageStore
+ */
+public class MessageStoreMediator extends AbstractMediator{
+
+ /**
+ * Name of the Mediator
+ */
+ private String name;
+
+ /**
+ *MessageStore Name
+ */
+ private String messageStoreName;
+
+ /**
+ * Sequence name to be invoked when the message is stored
+ */
+ private String onStoreSequence;
+
+
+ public boolean mediate(MessageContext synCtx) {
+ if(synCtx != null) {
+ MessageStore messageStore = synCtx.getConfiguration().getMessageStore(messageStoreName);
+ if(messageStore != null) {
+ if(onStoreSequence != null) {
+ Mediator sequence = synCtx.getSequence(onStoreSequence);
+ if(sequence != null) {
+ sequence.mediate(synCtx);
+ }
+ }
+ messageStore.store(synCtx);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getMessageStoreName() {
+ return messageStoreName;
+ }
+
+ public void setMessageStoreName(String messageStoreName) {
+ this.messageStoreName = messageStoreName;
+ }
+
+ public String getOnStoreSequence() {
+ return onStoreSequence;
+ }
+
+ public void setOnStoreSequence(String onStoreSequence) {
+ this.onStoreSequence = onStoreSequence;
+ }
+}
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java?rev=1069682&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java Fri Feb 11 06:16:58 2011
@@ -0,0 +1,104 @@
+/*
+* Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+* WSO2 Inc. licenses this file to you under the Apache License,
+* Version 2.0 (the "License"); you may not use this file except
+* in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.message.processors;
+
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.message.store.MessageStore;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *All Synapse Message Processors must implement <code>MessageProcessor</code> interface
+ *Message processors will process the Message using a Message Store.
+ *Message processing logic and process will depend on the
+ *concrete implementation of the MessageStore
+ */
+public interface MessageProcessor {
+
+ /**
+ * Start Message Processor
+ */
+ public void start();
+
+ /**
+ * Stop MessageProcessor
+ */
+ public void stop();
+
+ /**
+ * Set the Message Store that backs the Message processor
+ * @param messageStore the underlying MessageStore instance
+ */
+ public void setMessageStore(MessageStore messageStore);
+
+ /**
+ * Get the Message store that backs the Message processor
+ * @return the underlying MessageStore instance
+ */
+ public MessageStore getMessageStore();
+
+ /**
+ * Set the Mediator/Sequence to be invoked just before processing a Message
+ * This Mediator or sequence will be invoked just before processing the Message
+ * @param mediator Mediator/sequence instance that will invoked just before
+ * processing a Message
+ */
+ public void setOnProcessSequence(Mediator mediator);
+
+
+ /**
+ * Get the On process Mediator or sequence
+ * @return Mediator/sequence instance that will invoked just before processing a Message
+ */
+ public Mediator getOnProcessSequence();
+
+ /**
+ * This sequence/Mediator will be invoked when a Message is submitted to the MessageProcessor
+ * @param mediator Mediator/sequence instance that will invoked when a Message
+ * is submitted to the Processor
+ */
+ public void setOnSubmitSequence(Mediator mediator);
+
+ /**
+ * Get the OnSubmit Sequence which get invoked when a Message is submitted to
+ * the MessageProcessor
+ * @return mediator Mediator/sequence instance that will invoked when a Message
+ * is submitted to the Processor
+ */
+ public Mediator getOnSubmitSequence();
+
+ /**
+ * Set the Message processor parameters that will be used by the specific implementation
+ * @param parameters
+ */
+ public void setParameters(Map<String,Object> parameters);
+
+ /**
+ * Get the Message processor Parameters
+ * @return
+ */
+ public Map<String , Object> getParameters();
+
+ /**
+ * Returns weather a Message processor is started or not
+ * @return
+ */
+ public boolean isStarted();
+}
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java?rev=1069682&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java Fri Feb 11 06:16:58 2011
@@ -0,0 +1,108 @@
+/*
+* Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+* WSO2 Inc. licenses this file to you under the Apache License,
+* Version 2.0 (the "License"); you may not use this file except
+* in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.message.processors.dlc;
+
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseArtifact;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.message.store.MessageStore;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DeadLetterChannelView implements DeadLetterChannelViewMBean{
+
+ private MessageStore messageStore;
+
+
+ public DeadLetterChannelView(MessageStore messageStore) {
+ this.messageStore = messageStore;
+ }
+
+ public void resendAll() {
+ int size = messageStore.getSize();
+ for(int i = 0; i < size ; i++) {
+ MessageContext messageContext = messageStore.unstore(0,0).get(0);
+ if(messageContext != null) {
+ redeliver(messageContext);
+ }
+
+ }
+ }
+
+ public void deleteAll() {
+ int size = messageStore.getSize();
+ for(int i = 0; i < size ; i++) {
+ messageStore.unstore(0,0);
+ }
+ }
+
+ public List<String> getMessageIds() {
+ int size = messageStore.getSize();
+ List<String> list = new ArrayList<String>();
+ for(int i = 0; i < size ; i++) {
+ MessageContext messageContext = messageStore.unstore(0,0).get(0);
+ if(messageContext != null) {
+ list.add(messageContext.getMessageID());
+ }
+ }
+ return list;
+ }
+
+ public void resend(String messageID) {
+ MessageContext messageContext = messageStore.getMessage(messageID);
+ if(messageContext != null) {
+ redeliver(messageContext);
+ }
+ }
+
+ public void delete(String messageID) {
+ messageStore.unstore(messageID);
+ }
+
+ public String getEnvelope(String messageID) {
+ MessageContext messageContext = messageStore.getMessage(messageID);
+ if(messageContext != null) {
+ return messageContext.getEnvelope().toString();
+ }
+
+ return null;
+ }
+
+ public int getSize() {
+ return messageStore.getSize();
+ }
+
+ private void redeliver(MessageContext messageContext) {
+ SynapseArtifact artifact = RedeliveryProcessor.getReplayTarget(messageContext);
+ if (artifact instanceof Endpoint) {
+ if (!RedeliveryProcessor.handleEndpointReplay((Endpoint) artifact,
+ messageContext)) {
+ messageStore.store(messageContext);
+ }
+ } else if (artifact instanceof Mediator) {
+ if (!RedeliveryProcessor.handleSequenceReplay((Mediator) artifact,
+ messageContext)) {
+ messageStore.store(messageContext);
+ }
+ } else {
+ messageStore.store(messageContext);
+ }
+ }
+}
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelViewMBean.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelViewMBean.java?rev=1069682&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelViewMBean.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelViewMBean.java Fri Feb 11 06:16:58 2011
@@ -0,0 +1,72 @@
+/*
+* Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+* WSO2 Inc. licenses this file to you under the Apache License,
+* Version 2.0 (the "License"); you may not use this file except
+* in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.message.processors.dlc;
+
+import java.util.List;
+
+public interface DeadLetterChannelViewMBean {
+
+ /**
+ * try resending all messages stored in the message store via associated endpoints.
+ */
+ public void resendAll();
+
+ /**
+ * Delete all the Messages in Message store
+ */
+ public void deleteAll();
+
+
+ /**
+ * Get the Message IDs of all stored Messages in the Message store
+ *
+ * @return a list of message ID values
+ */
+ public List<String> getMessageIds();
+
+ /**
+ * Resend the Message with the given id
+ * return false if fail to re try deliver the message
+ *
+ * @param messageID ID of the message to be resent
+ * @return true if the resend operation was successful and false otherwise
+ */
+ public void resend(String messageID);
+
+ /**
+ * Delete the Message with Given id
+ *
+ * @param messageID ID of the message to be deleted
+ */
+ public void delete(String messageID);
+
+ /**
+ * Get the SOAP envelope of the given Message with given ID
+ *
+ * @param messageID ID of the message to be returned
+ * @return the SOAP envelope content as a string
+ */
+ public String getEnvelope(String messageID);
+
+ /**
+ *
+ * @return the number of Messages stored in the store.
+ */
+ public int getSize();
+
+}
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java?rev=1069682&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java Fri Feb 11 06:16:58 2011
@@ -0,0 +1,287 @@
+/*
+* Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+* WSO2 Inc. licenses this file to you under the Apache License,
+* Version 2.0 (the "License"); you may not use this file except
+* in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.message.processors.dlc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.*;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.message.processors.MessageProcessor;
+import org.apache.synapse.message.store.MessageStore;
+import org.apache.synapse.securevault.commons.MBeanRegistrar;
+
+import java.util.Map;
+
+/**
+ * Redelivery processor is the Message processor which implements the Dead letter channel EIP
+ * It will Time to time Redeliver the Messages to a given target.
+ */
+public class RedeliveryProcessor implements MessageProcessor {
+
+ private static final Log log = LogFactory.getLog(RedeliveryProcessor.class);
+
+ /**
+ * Associated MessageStore
+ */
+ private MessageStore messageStore;
+
+
+ private Mediator onProcessMediator;
+
+
+ private Mediator onSubmitMediator;
+
+ private Map<String, Object> parameters;
+
+ /**
+ * Maximum number of redelivery's per message
+ */
+ private int maxRedeleveries = 0;
+
+ /**
+ * Delay between two consecutive redelivery attempts
+ */
+ private int redeliveryDelay = 2000;
+
+ /**
+ * enable/disable exponential backoff
+ */
+ private boolean exponentialBackoff = false;
+
+ /**
+ * the multiplier that will be used in the exponential backoff algorithm
+ */
+ private int backOffMultiplier = -1;
+
+
+ private DeadLetterChannelViewMBean dlcView;
+
+ private boolean started;
+
+ public static final String REDELIVERY_DELAY = "redelivery.delay";
+
+ public static final String MAX_REDELIVERY_COUNT = "redelivery.count";
+
+ public static final String EXPONENTIAL_BACKOFF = "redelivery.exponentialBackoff";
+
+ public static final String BACKOFF_MUTIPLIER = "redelivery.backoffMultiplier";
+
+
+ public static final String REPLAY_ENDPOINT = "replay.endpoint";
+
+ public static final String REPLAY_SEQUENCE = "replay.sequence";
+
+ public static final String NO_OF_REDELIVERIES = "number.of.redeliveries";
+
+ public void start() {
+ if (!started) {
+ Thread t = new Thread(new Worker());
+ t.start();
+ }
+ }
+
+ public void stop() {
+ started = false;
+ }
+
+ public void setMessageStore(MessageStore messageStore) {
+ this.messageStore = messageStore;
+ if(messageStore !=null) {
+ DeadLetterChannelView view = new DeadLetterChannelView(messageStore);
+ this.dlcView = view;
+ MBeanRegistrar.getInstance().registerMBean(view,"Dead Letter Channel",
+ messageStore.getName());
+ }
+ }
+
+ public MessageStore getMessageStore() {
+ return this.messageStore;
+ }
+
+ public void setOnProcessSequence(Mediator mediator) {
+ this.onProcessMediator = mediator;
+ }
+
+ public Mediator getOnProcessSequence() {
+ return this.onProcessMediator;
+ }
+
+ public void setOnSubmitSequence(Mediator mediator) {
+ this.onSubmitMediator = mediator;
+ }
+
+ public Mediator getOnSubmitSequence() {
+ return this.onSubmitMediator;
+ }
+
+ public void setParameters(Map<String, Object> parameters) {
+ this.parameters = parameters;
+ if (parameters.containsKey(REDELIVERY_DELAY)) {
+ redeliveryDelay = Integer.parseInt((String) parameters.get(REDELIVERY_DELAY));
+ }
+
+ if (parameters.containsKey(MAX_REDELIVERY_COUNT)) {
+ maxRedeleveries = Integer.parseInt((String) parameters.get(MAX_REDELIVERY_COUNT));
+ }
+
+ if (parameters.containsKey(EXPONENTIAL_BACKOFF)) {
+ if ("true".equals(parameters.get(EXPONENTIAL_BACKOFF))) {
+ exponentialBackoff = true;
+ }
+ }
+
+ if (parameters.containsKey(BACKOFF_MUTIPLIER)) {
+ backOffMultiplier = Integer.parseInt((String) parameters.get(BACKOFF_MUTIPLIER));
+ }
+ }
+
+ public Map<String, Object> getParameters() {
+ return parameters;
+ }
+
+
+ private class Worker implements Runnable {
+
+ public void run() {
+ while (started) {
+
+
+ try {
+ synchronized (this) {
+ int delay = redeliveryDelay;
+
+ MessageContext messageContext;
+ messageContext = messageStore.getMessages(0, 0).get(0);
+
+ if (messageContext == null) {
+ continue;
+ }
+
+ SynapseArtifact artifact = getReplayTarget(messageContext);
+ messageStore.unstore(0, 0);
+ if (messageContext.getProperty(NO_OF_REDELIVERIES) == null) {
+ messageContext.setProperty(NO_OF_REDELIVERIES, "0");
+ delay = redeliveryDelay;
+ }
+
+ String numberS = (String) messageContext.getProperty(NO_OF_REDELIVERIES);
+ int number = Integer.parseInt(numberS);
+
+ if (number >= maxRedeleveries) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Maximum number of attempts tried for Message with ID " +
+ messageContext.getMessageID() +
+ "will be put back to the Message Store");
+
+ }
+ messageStore.store(messageContext);
+ continue;
+ }
+
+ messageContext.setProperty(NO_OF_REDELIVERIES, "" + (number + 1));
+
+
+ if (exponentialBackoff && backOffMultiplier == -1) {
+ delay = (number + 1) * redeliveryDelay;
+
+ } else if (exponentialBackoff) {
+ delay = (int) Math.pow(backOffMultiplier, number) * redeliveryDelay;
+
+ }
+
+
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException ignored) {
+
+ }
+
+ if (artifact instanceof Endpoint) {
+ if (!handleEndpointReplay((Endpoint) artifact, messageContext)) {
+ messageStore.store(messageContext);
+ }
+ } else if (artifact instanceof Mediator) {
+ if (!handleSequenceReplay((Mediator) artifact, messageContext)) {
+ messageStore.store(messageContext);
+ }
+ } else {
+ messageStore.store(messageContext);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("sent \n" + messageContext.getEnvelope());
+ }
+
+
+ }
+ } catch (Throwable e) {
+ log.warn("Error while Running Redelivery process " +e.getMessage());
+ }
+
+ }
+
+ }
+ }
+
+ public static SynapseArtifact getReplayTarget(MessageContext context) {
+ //Endpoint replay get priority
+ if (context.getProperty(REPLAY_ENDPOINT) != null) {
+ String endpointName = (String) context.getProperty(REPLAY_ENDPOINT);
+ return context.getConfiguration().getDefinedEndpoints().get(endpointName);
+ } else if (context.getProperty(REPLAY_SEQUENCE) != null) {
+ String sequenceName = (String) context.getProperty(REPLAY_SEQUENCE);
+
+ return context.getConfiguration().getSequence(sequenceName);
+ }
+
+ return null;
+ }
+
+
+ public static boolean handleEndpointReplay(Endpoint endpoint, MessageContext messageContext) {
+ if (endpoint.readyToSend()) {
+ endpoint.send(messageContext);
+ return true;
+ }
+
+ return false;
+ }
+
+
+ public static boolean handleSequenceReplay(Mediator mediator, MessageContext messageContext) {
+ mediator.mediate(messageContext);
+ return true;
+ }
+
+ /**
+ * Get the DLC related JMX API
+ * @return instance of Dead letter channel jms api
+ */
+ public DeadLetterChannelViewMBean getDlcView() {
+ return dlcView;
+ }
+
+ /**
+ * Get the started status of the Message processor
+ * @return started status of message processor (true/false)
+ */
+ public boolean isStarted() {
+ return started;
+ }
+}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java Fri Feb 11 06:16:58 2011
@@ -22,13 +22,11 @@ package org.apache.synapse.message.store
import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
-import org.apache.synapse.SynapseConstants;
import org.apache.synapse.commons.jmx.MBeanRegistrar;
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.message.processors.MessageProcessor;
-import java.util.LinkedList;
-import java.util.Queue;
import java.util.Map;
public abstract class AbstractMessageStore implements MessageStore, ManagedLifecycle {
@@ -39,16 +37,6 @@ public abstract class AbstractMessageSto
protected String name;
/**
- * associated redelivery processor
- */
- protected RedeliveryProcessor redeliveryProcessor;
-
- /**
- * queue that holds the sheduled messages
- */
- protected Queue<StorableMessage> scheduledMessageQueue = new LinkedList<StorableMessage>();
-
- /**
* name of the sequence to be executed before storing the message
*/
protected String sequence;
@@ -59,17 +47,17 @@ public abstract class AbstractMessageSto
protected MessageStoreView messageStoreMBean;
/**
- * synapse configuration reffrence
+ * synapse configuration reference
*/
protected SynapseConfiguration synapseConfiguration;
/**
- * synapse environment reffrence
+ * synapse environment reference
*/
protected SynapseEnvironment synapseEnvironment;
/**
- * Message store properties
+ * Message store parameters
*/
protected Map<String,Object> parameters;
@@ -83,6 +71,11 @@ public abstract class AbstractMessageSto
*/
protected String fileName;
+ /**
+ * Message processor instance associated with the MessageStore
+ */
+ protected MessageProcessor processor;
+
public void init(SynapseEnvironment se) {
this.synapseEnvironment = se;
this.synapseConfiguration = synapseEnvironment.getSynapseConfiguration();
@@ -96,41 +89,13 @@ public abstract class AbstractMessageSto
this.name = name;
messageStoreMBean = new MessageStoreView(name, this);
MBeanRegistrar.getInstance().registerMBean(messageStoreMBean,
- "DeadLetterChannel", this.name);
- }
-
- public void setRedeliveryProcessor(RedeliveryProcessor redeliveryProcessor) {
- this.redeliveryProcessor = redeliveryProcessor;
- }
-
- public RedeliveryProcessor getRedeliveryProcessor() {
- return redeliveryProcessor;
+ "MessageStore", this.name);
}
- public void schedule(StorableMessage storableMessage) {
- if (storableMessage != null) {
- scheduledMessageQueue.add(storableMessage);
- }
-
- if (scheduledMessageQueue.size() > 0 && redeliveryProcessor != null &&
- !redeliveryProcessor.isStarted()) {
- redeliveryProcessor.start();
- }
- }
-
- public StorableMessage dequeueScheduledQueue() {
- return scheduledMessageQueue.poll();
- }
-
- public StorableMessage getFirstSheduledMessage() {
- return scheduledMessageQueue.peek();
- }
-
protected void mediateSequence(MessageContext synCtx) {
- if (sequence != null && synCtx != null && "true".equalsIgnoreCase(
- (String) synCtx.getProperty(SynapseConstants.MESSAGE_STORE_REDELIVERED))) {
+ if (sequence != null && synCtx != null) {
Mediator seq = synCtx.getSequence(sequence);
if (seq != null) {
seq.mediate(synCtx);
@@ -138,6 +103,14 @@ public abstract class AbstractMessageSto
}
}
+ public void setMessageProcessor(MessageProcessor messageProcessor) {
+ this.processor = messageProcessor;
+ }
+
+ public MessageProcessor getMessageProcessor() {
+ return processor;
+ }
+
public int getSize() {
return -1;
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java Fri Feb 11 06:16:58 2011
@@ -21,6 +21,7 @@ package org.apache.synapse.message.store
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
import java.util.*;
@@ -32,46 +33,86 @@ public class InMemoryMessageStore extend
private static final Log log = LogFactory.getLog(InMemoryMessageStore.class);
/** The map that keeps the stored messages */
- private Map<String, StorableMessage> messageList = new HashMap<String, StorableMessage>();
+ private Map<String, MessageContext> messageList = new HashMap<String, MessageContext>();
- public void store(StorableMessage storableMessage) {
- if (storableMessage != null) {
- mediateSequence(storableMessage.getMessageContext());
- messageList.put(storableMessage.getMessageContext().getMessageID(), storableMessage);
+ public void store(MessageContext messageContext) {
+ if (messageContext != null) {
+ mediateSequence(messageContext);
+ messageList.put(messageContext.getMessageID(), messageContext);
+
+ /**
+ * If the associated processor is not started we start it. When storing the message
+ */
+ if(!processor.isStarted()) {
+ processor.start();
+ }
if (log.isDebugEnabled()) {
- log.debug("Message " + storableMessage.getMessageContext().getMessageID() +
+ log.debug("Message " + messageContext.getMessageID() +
" has been stored");
}
}
}
- public StorableMessage unstore(String messageID) {
+ public MessageContext unstore(String messageID) {
if (messageID != null) {
return messageList.remove(messageID);
}
return null;
}
- public List<StorableMessage> unstoreAll() {
- List<StorableMessage> returnList = new ArrayList<StorableMessage>();
+ public List<MessageContext> unstoreAll() {
+ List<MessageContext> returnList = new ArrayList<MessageContext>();
for (String k : messageList.keySet()) {
returnList.add(messageList.remove(k));
}
return returnList;
}
- public List<StorableMessage> getAllMessages() {
- List<StorableMessage> returnlist = new ArrayList<StorableMessage>();
-
+ public List<MessageContext> unstore(int maxNumberOfMessages) {
+ List<MessageContext> returnList = new ArrayList<MessageContext>();
Iterator<String> it = messageList.keySet().iterator();
- while (it.hasNext()) {
- returnlist.add(messageList.get(it.next()));
+ while (it.hasNext() && maxNumberOfMessages > 0) {
+ returnList.add(messageList.get(it.next()));
+ maxNumberOfMessages--;
}
+ return returnList;
+ }
+
+ public List<MessageContext> unstore(int from, int to) {
+ List<MessageContext> returnlist = new ArrayList<MessageContext>();
+ if (from <= to && (from <= messageList.size() && to <= messageList.size())) {
+
+ String[] keys = messageList.keySet().toArray(new String[0]);
+
+ for (int i = from; i <= to; i++) {
+ returnlist.add(messageList.remove(keys[i]));
+ }
+ }
+ return returnlist;
+ }
+
+ public List<MessageContext> getMessages(int from, int to) {
+ List<MessageContext> returnlist = new ArrayList<MessageContext>();
+ if (from <= to && (from <= messageList.size() && to <= messageList.size())) {
+ String[] keys = messageList.keySet().toArray(new String[0]);
+
+ for (int i = from; i <= to; i++) {
+ returnlist.add(messageList.get(keys[i]));
+ }
+ }
return returnlist;
}
- public StorableMessage getMessage(String messageId) {
+ public List<MessageContext> getAllMessages() {
+ List<MessageContext> returnList = new ArrayList<MessageContext>();
+ for (Map.Entry<String ,MessageContext> entry :messageList.entrySet()) {
+ returnList.add(entry.getValue());
+ }
+ return returnList;
+ }
+
+ public MessageContext getMessage(String messageId) {
if (messageId != null) {
return messageList.get(messageId);
}
@@ -79,6 +120,18 @@ public class InMemoryMessageStore extend
return null;
}
+ public List<MessageContext> getMessages(int maxNumberOfMessages) {
+ List<MessageContext> returnList = new ArrayList<MessageContext>();
+
+ Iterator<String> it = messageList.keySet().iterator();
+ while (it.hasNext() && maxNumberOfMessages > 0) {
+ returnList.add(messageList.get(it.next()));
+ maxNumberOfMessages--;
+ }
+
+ return returnList;
+ }
+
public int getSize() {
return messageList.size();
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java Fri Feb 11 06:16:58 2011
@@ -19,9 +19,11 @@
package org.apache.synapse.message.store;
+import org.apache.synapse.MessageContext;
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.SynapseArtifact;
import org.apache.synapse.Nameable;
+import org.apache.synapse.message.processors.MessageProcessor;
import java.util.List;
import java.util.Map;
@@ -33,71 +35,56 @@ import java.util.Map;
public interface MessageStore extends SynapseArtifact, Nameable {
/**
- * store the Message in the Message Store
- * Underlying message store implementation must handle the efficient way of storing the Message
- * @param storableMessage wrapper of the Message context
+ * Store the Message in the Message Store
+ * @param messageContext MessageContext to be saved
*/
- public void store(StorableMessage storableMessage);
+ public void store(MessageContext messageContext);
/**
- * Store the Message in schedule queue to redeliver
- *
- * @param storableMessage A StorableMessage instance
+ * Delete and return the MessageContext with given Message id
+ * @param messageID message id of the Message
+ * @return MessageContext instance
*/
- public void schedule(StorableMessage storableMessage);
-
-
- public StorableMessage dequeueScheduledQueue();
+ public MessageContext unstore(String messageID);
/**
- * return the Message That is on top of the queue
- *
- * @return A StorableMessage instance or null
+ * Delete all the Messages in the Message Store
+ * @return List of all messages in store
*/
- public StorableMessage getFirstSheduledMessage();
+ public List<MessageContext> unstoreAll();
- /**
- * Unstore the Message with Given Message Id from the MessageStore
- * @param messageID a message ID string
- * @return unstored Message
- */
- public StorableMessage unstore(String messageID);
/**
- * Delete all the Messages in the Message Store
- * @return List of all messages in store
+ * Unstore Messages from index 'from' to index 'to'
+ * Message ordering will be depend on the implementation
+ * @param from start index
+ * @param to stop index
+ * @return list of messages that are belong to given range
*/
- public List<StorableMessage> unstoreAll();
+ public List<MessageContext> unstore(int from , int to);
/**
* Get the All messages in the Message store without removing them from the queue
* @return List of all Messages
*/
- public List<StorableMessage> getAllMessages();
+ public List<MessageContext> getAllMessages();
/**
* Get the Message with the given ID from the Message store without removing it
* @param messageId A message ID string
* @return Message with given ID
*/
- public StorableMessage getMessage(String messageId);
+ public MessageContext getMessage(String messageId);
- /**
- * Set the redelivery processor instance associated with the Message Store
- * redelivery processor have the responsibility of redelivery message according
- * to a policy defined
- *
- * @param redeliveryProcessor The redelivery processor to be registered
- */
- public void setRedeliveryProcessor(RedeliveryProcessor redeliveryProcessor);
/**
- * Return the redelivery processor instance associated with the message store
- *
- * @return A RedlieveryProcessor or null
+ * Get Messages from index 'from' to index 'to'
+ * Message ordering will be depend on the implementation
+ * @param from start index
+ * @param to stop index
+ * @return list of messages that are belong to given range
*/
- public RedeliveryProcessor getRedeliveryProcessor();
-
+ public List<MessageContext> getMessages(int from , int to);
/**
* set the implementation specific parameters
* @param parameters A map of parameters or null
@@ -156,5 +143,17 @@ public interface MessageStore extends Sy
* @return Name of the file where this artifact is defined
*/
public String getFileName();
-
+
+
+ /**
+ * Set the Message Processor Associated with the Message Store
+ * @param messageProcessor message processor instance associated with message store
+ */
+ public void setMessageProcessor(MessageProcessor messageProcessor);
+
+ /**
+ * Get the Message Processor associated with the MessageStore
+ * @return message processor instance associated with the message store
+ */
+ public MessageProcessor getMessageProcessor();
}
\ No newline at end of file
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreView.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreView.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreView.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreView.java Fri Feb 11 06:16:58 2011
@@ -21,6 +21,7 @@ package org.apache.synapse.message.store
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
import java.util.ArrayList;
import java.util.List;
@@ -38,19 +39,6 @@ public class MessageStoreView implements
this.messageStore = messageStore;
}
- public void resendAll() {
- List<StorableMessage> list = messageStore.getAllMessages();
-
- for(int i = 0; i < list.size(); i++) {
- StorableMessage m = list.get(i);
- // wait till the endpoint is ready
- while(!m.getEndpoint().readyToSend());
- //resend
- m.getEndpoint().send(m.getMessageContext());
- }
-
- log.info("All Messages in Message Store " +messageStoreName+ " were resent");
- }
public void deleteAll() {
messageStore.unstoreAll();
@@ -60,36 +48,17 @@ public class MessageStoreView implements
public List<String> getMessageIds() {
List<String> returnList = new ArrayList<String>();
- List<StorableMessage> list = messageStore.getAllMessages();
+ List<MessageContext> list = messageStore.getAllMessages();
- for(StorableMessage m : list) {
- returnList.add(m.getMessageContext().getMessageID());
+ for(MessageContext m : list) {
+ returnList.add(m.getMessageID());
}
return returnList;
}
- public boolean resend(String messageID) {
-
- StorableMessage m = messageStore.getMessage(messageID);
-
- if (m != null) {
- if (m.getEndpoint().readyToSend()) {
- m.getEndpoint().send(m.getMessageContext());
- log.info("Message with ID " + messageID + " resent via the Endpoint" +
- m.getEndpoint().getName());
- return true;
- } else {
- log.info("Message with ID " + messageID +" unable resent via the Endpoint" +
- m.getEndpoint().getName());
- }
- }
-
- return false;
- }
-
public void delete(String messageID) {
if(messageID != null) {
- StorableMessage m =messageStore.unstore(messageID);
+ MessageContext m =messageStore.unstore(messageID);
if (m != null){
log.info("Message with ID :" + messageID + " removed from the MessageStore");
}
@@ -98,10 +67,10 @@ public class MessageStoreView implements
public String getEnvelope(String messageID) {
if (messageID != null) {
- StorableMessage m = messageStore.getMessage(messageID);
+ MessageContext m = messageStore.getMessage(messageID);
if (m != null) {
- return m.getMessageContext().getEnvelope().toString();
+ return m.getEnvelope().toString();
}
}
return null;
@@ -110,4 +79,8 @@ public class MessageStoreView implements
public int getSize() {
return messageStore.getSize();
}
+
+ public void delete(int maxCount) {
+ messageStore.unstore(0,maxCount-1);
+ }
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreViewMBean.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreViewMBean.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreViewMBean.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreViewMBean.java Fri Feb 11 06:16:58 2011
@@ -24,31 +24,24 @@ import java.util.List;
public interface MessageStoreViewMBean {
/**
- * try resending all messages stored in the message store via associated endpoints.
- */
- public void resendAll();
-
- /**
* Delete all the Messages in Message store
*/
public void deleteAll();
/**
+ * Delete given number of Messages from the MessageStore
+ * @param maxCount
+ */
+ public void delete(int maxCount);
+
+ /**
* Get the Message IDs of all stored Messages in the Message store
*
* @return a list of message ID values
*/
public List<String> getMessageIds();
- /**
- * Resend the Message with the given id
- * return false if fail to re try deliver the message
- *
- * @param messageID ID of the message to be resent
- * @return true if the resend operation was successful and false otherwise
- */
- public boolean resend(String messageID);
/**
* Delete the Message with Given id
@@ -58,7 +51,7 @@ public interface MessageStoreViewMBean {
public void delete(String messageID);
/**
- * Get the SOAP envelope of the given Messaage with given ID
+ * Get the SOAP envelope of the given Message with given ID
*
* @param messageID ID of the message to be returned
* @return the SOAP envelope content as a string
Modified: synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageStoreSerializationTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageStoreSerializationTest.java?rev=1069682&r1=1069681&r2=1069682&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageStoreSerializationTest.java (original)
+++ synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/MessageStoreSerializationTest.java Fri Feb 11 06:16:58 2011
@@ -39,12 +39,6 @@ public class MessageStoreSerializationTe
String messageStoreConfiguration = "<syn:messageStore xmlns:syn=\"" +
"http://ws.apache.org/ns/synapse\"" +
" name=\"foo\" sequence=\"seq1\" >" +
- "<syn:redelivery>" +
- "<syn:interval>1</syn:interval>" +
- "<syn:maximumRedeliveries>5</syn:maximumRedeliveries>" +
- "<syn:exponentialBackoff>true</syn:exponentialBackoff>" +
- "<syn:backoffMutiplier>2</syn:backoffMutiplier>" +
- "</syn:redelivery>" +
"</syn:messageStore>";
OMElement messageStoreElement = createOMElement(messageStoreConfiguration);