You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by as...@apache.org on 2008/01/13 16:24:37 UTC
svn commit: r611582 - in
/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse:
config/xml/ mediators/ mediators/eip/ mediators/eip/aggregator/
mediators/eip/splitter/ util/
Author: asankha
Date: Sun Jan 13 07:24:37 2008
New Revision: 611582
URL: http://svn.apache.org/viewvc?rev=611582&view=rev
Log:
fix issues with the aggrgate mediator and review code and fix issues with EIP mediators package
Modified:
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetFactory.java
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetSerializer.java
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPConstants.java
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/Target.java
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/util/MessageHelper.java
Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java Sun Jan 13 07:24:37 2008
@@ -20,7 +20,6 @@
package org.apache.synapse.config.xml;
import org.apache.synapse.Mediator;
-import org.apache.synapse.SynapseException;
import org.apache.synapse.mediators.eip.aggregator.AggregateMediator;
import org.apache.synapse.mediators.builtin.DropMediator;
import org.apache.synapse.mediators.base.SequenceMediator;
@@ -58,9 +57,6 @@
private static final QName MESSAGE_COUNT_Q
= new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "messageCount");
private static final QName ON_COMPLETE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "onComplete");
- private static final QName INVALIDATE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "invalidate");
-
- private static final QName TIME_TO_LIVE_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "timeToLive");
private static final QName EXPRESSION_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "expression");
private static final QName TIMEOUT_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "timeout");
private static final QName MIN_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "min");
@@ -71,11 +67,6 @@
AggregateMediator mediator = new AggregateMediator();
processTraceState(mediator, elem);
- // todo: need to fix
- OMAttribute timeToLive = elem.getAttribute(TIME_TO_LIVE_Q);
- if (timeToLive != null) {
- mediator.setTimeToInvalidate(Long.parseLong(timeToLive.getAttributeValue()) * 1000);
- }
OMElement corelateOn = elem.getFirstChildWithName(CORELATE_ON_Q);
if (corelateOn != null) {
@@ -84,7 +75,7 @@
try {
AXIOMXPath xp = new AXIOMXPath(corelateExpr.getAttributeValue());
OMElementUtils.addNameSpaces(xp, corelateOn, log);
- mediator.setCorelateExpression(xp);
+ mediator.setCorrelateExpression(xp);
} catch (JaxenException e) {
handleException("Unable to load the corelate XPATH expression", e);
}
@@ -95,7 +86,7 @@
if (completeCond != null) {
OMAttribute completeTimeout = completeCond.getAttribute(TIMEOUT_Q);
if (completeTimeout != null) {
- mediator.setCompleteTimeout(
+ mediator.setCompletionTimeoutMillis(
Long.parseLong(completeTimeout.getAttributeValue()) * 1000);
}
@@ -110,24 +101,6 @@
if (max != null) {
mediator.setMaxMessagesToComplete(Integer.parseInt(max.getAttributeValue()));
}
- }
- }
-
- OMElement invalidate = elem.getFirstChildWithName(INVALIDATE_Q);
- if (invalidate != null) {
- OMAttribute sequenceRef = invalidate.getAttribute(SEQUENCE_Q);
- if (sequenceRef != null) {
- mediator.setInvalidMsgSequenceRef(sequenceRef.getAttributeValue());
- } else if (invalidate.getFirstElement() != null) {
- mediator.setInvalidMsgSequence(
- (new SequenceMediatorFactory()).createAnonymousSequence(invalidate));
- }
-
- OMAttribute timeout = invalidate.getAttribute(TIMEOUT_Q);
- if (timeout != null) {
- mediator.setInvlidateToDestroyTime(Long.parseLong(timeout.getAttributeValue()));
- } else {
- mediator.setInvlidateToDestroyTime(300);
}
}
Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java Sun Jan 13 07:24:37 2008
@@ -20,11 +20,8 @@
package org.apache.synapse.config.xml;
import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMAttribute;
import org.apache.synapse.Mediator;
-import org.apache.synapse.SynapseException;
import org.apache.synapse.mediators.eip.aggregator.AggregateMediator;
-import org.apache.synapse.mediators.ext.ClassMediator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -55,16 +52,16 @@
OMElement aggregator = fac.createOMElement("aggregate", synNS);
saveTracingState(aggregator, mediator);
- if (mediator.getCorelateExpression() != null) {
+ if (mediator.getCorrelateExpression() != null) {
OMElement corelateOn = fac.createOMElement("corelateOn", synNS);
- corelateOn.addAttribute("expression", mediator.getCorelateExpression().toString(), nullNS);
- super.serializeNamespaces(corelateOn, mediator.getCorelateExpression());
+ corelateOn.addAttribute("expression", mediator.getCorrelateExpression().toString(), nullNS);
+ super.serializeNamespaces(corelateOn, mediator.getCorrelateExpression());
aggregator.addChild(corelateOn);
}
OMElement completeCond = fac.createOMElement("completeCondition", synNS);
- if (mediator.getCompleteTimeout() != 0) {
- completeCond.addAttribute("timeout", Long.toString(mediator.getCompleteTimeout()), nullNS);
+ if (mediator.getCompletionTimeoutMillis() != 0) {
+ completeCond.addAttribute("timeout", Long.toString(mediator.getCompletionTimeoutMillis()), nullNS);
}
OMElement messageCount = fac.createOMElement("messageCount", synNS);
if (mediator.getMinMessagesToComplete() != 0) {
@@ -88,16 +85,6 @@
onCompleteElem, mediator.getOnCompleteSequence().getList());
}
aggregator.addChild(onCompleteElem);
-
- OMElement invalidateElem = fac.createOMElement("invalidate", synNS);
- invalidateElem.addAttribute("timeout", Long.toString(mediator.getInvlidateToDestroyTime()), nullNS);
- if (mediator.getInvalidMsgSequenceRef() != null) {
- invalidateElem.addAttribute("sequence", mediator.getInvalidMsgSequenceRef(), nullNS);
- } else if (mediator.getInvalidMsgSequence() != null) {
- new SequenceMediatorSerializer().serializeChildren(
- invalidateElem, mediator.getInvalidMsgSequence().getList());
- }
- aggregator.addChild(invalidateElem);
if (parent != null) {
parent.addChild(aggregator);
Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetFactory.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetFactory.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetFactory.java Sun Jan 13 07:24:37 2008
@@ -64,7 +64,7 @@
Target target = new Target();
OMAttribute toAttr = elem.getAttribute(new QName(XMLConfigConstants.NULL_NAMESPACE, "to"));
if (toAttr != null && toAttr.getAttributeValue() != null) {
- target.setTo(toAttr.getAttributeValue());
+ target.setToAddress(toAttr.getAttributeValue());
}
OMAttribute soapAction = elem.getAttribute(
Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetSerializer.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetSerializer.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetSerializer.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetSerializer.java Sun Jan 13 07:24:37 2008
@@ -61,8 +61,8 @@
public static OMElement serializeTarget(Target target) {
OMElement targetElem = fac.createOMElement("target", synNS);
- if (target.getTo() != null) {
- targetElem.addAttribute("to", target.getTo(), nullNS);
+ if (target.getToAddress() != null) {
+ targetElem.addAttribute("to", target.getToAddress(), nullNS);
}
if (target.getSoapAction() != null) {
Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java Sun Jan 13 07:24:37 2008
@@ -68,7 +68,7 @@
public void run() {
try {
seq.mediate(synCtx);
- ((Axis2MessageContext)synCtx).getAxis2MessageContext().getEnvelope().discard();
+ //((Axis2MessageContext)synCtx).getAxis2MessageContext().getEnvelope().discard();
} catch (SynapseException syne) {
if (!synCtx.getFaultStack().isEmpty()) {
@@ -81,7 +81,7 @@
}
} catch (Exception e) {
- String msg = "Unexpected error executing task";
+ String msg = "Unexpected error executing task/async inject";
log.error(msg, e);
if (synCtx.getServiceLog() != null) {
synCtx.getServiceLog().error(msg, e);
Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPConstants.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPConstants.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPConstants.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPConstants.java Sun Jan 13 07:24:37 2008
@@ -19,11 +19,13 @@
package org.apache.synapse.mediators.eip;
-/** Holds all the constants related to the eip mediators */
+/** Constants related to the EIP mediators */
public final class EIPConstants {
- /** Constant for the correlation property key */
- public static final String AGGREGATE_CORELATION = "aggregateCorelation";
+ /** Typically the message ID of the parent message in a split/iterate etc so that
+ * its children could be uniquely aggregated by the aggrgate mediator etc
+ */
+ public static final String AGGREGATE_CORRELATION = "aggregateCorelation";
/** Constant for the message sequence property key */
public static final String MESSAGE_SEQUENCE = "messageSequence";
Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java Sun Jan 13 07:24:37 2008
@@ -32,23 +32,19 @@
import java.util.List;
/**
- * Utility methods for the EIP implementations
+ * Utility methods for the EIP mediators
*/
public class EIPUtils {
- /**
- * This will be used for logging purposes
- */
private static final Log log = LogFactory.getLog(EIPUtils.class);
/**
- * This static util method will be used to extract out the set of all elements described by the
- * given XPath over the given SOAPEnvelope
+ * Return the set of elements specified by the XPath over the given envelope
*
- * @param envelope SOAPEnvelope from which the the elements will be extracted
- * @param expression AXIOMXPath expression describing the elements
- * @return List of OMElements in the envelope matching the expression
- * @throws JaxenException if the XPath expression evaluation fails for some reason
+ * @param envelope SOAPEnvelope from which the elements will be extracted
+ * @param expression AXIOMXPath expression describing the elements to be extracted
+ * @return List OMElements in the envelope matching the expression
+ * @throws JaxenException if the XPath expression evaluation fails
*/
public static List getMatchingElements(SOAPEnvelope envelope, AXIOMXPath expression)
throws JaxenException {
@@ -61,15 +57,17 @@
} else if (o instanceof List) {
return (List) o;
} else {
- return null;
+ return new ArrayList();
}
}
/**
- * @param envelope
- * @param expression
- * @return
- * @throws JaxenException
+ * Return the set of detached elements specified by the XPath over the given envelope
+ *
+ * @param envelope SOAPEnvelope from which the elements will be extracted
+ * @param expression AXIOMXPath expression describing the elements to be extracted
+ * @return List detached OMElements in the envelope matching the expression
+ * @throws JaxenException if the XPath expression evaluation fails
*/
public static List getDetachedMatchingElements(SOAPEnvelope envelope, AXIOMXPath expression)
throws JaxenException {
@@ -90,8 +88,8 @@
}
/**
- * This static util method will be used to enrich the envelope passed, by the element described
- * by the XPath over the enricher envelope
+ * Merge two SOAP envelopes using the given XPath expression that specifies the
+ * element that enriches the first envelope from the second
*
* @param envelope SOAPEnvelope to be enriched with the content
* @param enricher SOAPEnvelope from which the enriching element will be extracted
@@ -99,12 +97,18 @@
*/
public static void enrichEnvelope(SOAPEnvelope envelope, SOAPEnvelope enricher,
AXIOMXPath expression) throws JaxenException {
+
OMElement enrichingElement;
- Object o = getMatchingElements(envelope, expression);
- if (o != null && o instanceof List && !((List) o).isEmpty()) {
- o = ((List) o).get(0);
+ List elementList = getMatchingElements(envelope, expression);
+
+ if (elementList != null && !elementList.isEmpty()) {
+
+ // attach at parent of the first result from the XPath, or to the SOAPBody
+ Object o = elementList.get(0);
- if (o instanceof OMElement && ((OMElement) o).getParent() instanceof OMElement) {
+ if (o instanceof OMElement &&
+ ((OMElement) o).getParent() != null &&
+ ((OMElement) o).getParent() instanceof OMElement) {
enrichingElement = (OMElement) ((OMElement) o).getParent();
} else {
enrichingElement = envelope.getBody();
Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/Target.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/Target.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/Target.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/Target.java Sun Jan 13 07:24:37 2008
@@ -19,49 +19,37 @@
package org.apache.synapse.mediators.eip;
-import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.mediators.base.SequenceMediator;
import org.apache.axis2.addressing.EndpointReference;
/**
- * This class will be a bean which carries the target information for most of the EIP mediators
+ * A bean class that holds the target (i.e. sequence or endpoint) information for a message
+ * as used by common EIP mediators
*/
public class Target {
- /**
- * Holds the to address of the target endpoint
- */
- private String to = null;
+ /** An optional To address to be set on the message when handing over to the target */
+ private String toAddress = null;
- /**
- * Holds the soapAction of the target service
- */
+ /** An optional Action to be set on the message when handing over to the target */
private String soapAction = null;
- /**
- * Holds the target mediation sequence as an anonymous sequence
- */
+ /** The inlined target sequence definition */
private SequenceMediator sequence = null;
- /**
- * Holds the target mediation sequence as a sequence reference
- */
+ /** The target sequence reference key */
private String sequenceRef = null;
- /**
- * Holds the target endpoint to which the message will be sent
- */
+ /** The inlined target endpoint definition */
private Endpoint endpoint = null;
- /**
- * Holds the reference to the target endpoint to which the message will be sent
- */
+ /** The target endpoint reference key */
private String endpointRef = null;
/**
- * This method will be called by the EIP mediators to mediated the target (may be to mediate
+ * process the message through this target (may be to mediate
* using the target sequence, send message to the target endpoint or both)
*
* @param synCtx - MessageContext to be mediated
@@ -72,14 +60,16 @@
synCtx.setSoapAction(soapAction);
}
- if (to != null) {
+ if (toAddress != null) {
if (synCtx.getTo() != null) {
- synCtx.getTo().setAddress(to);
+ synCtx.getTo().setAddress(toAddress);
} else {
- synCtx.setTo(new EndpointReference(to));
+ synCtx.setTo(new EndpointReference(toAddress));
}
}
+ // since we are injecting the new messages asynchronously, we cannot process a message
+ // through a sequence and then again with an endpoint
if (sequence != null) {
synCtx.getEnvironment().injectAsync(synCtx, sequence);
} else if (sequenceRef != null) {
@@ -87,9 +77,7 @@
if (refSequence != null) {
synCtx.getEnvironment().injectAsync(synCtx, refSequence);
}
- }
-
- if (endpoint != null) {
+ } else if (endpoint != null) {
endpoint.send(synCtx);
} else if (endpointRef != null) {
Endpoint epr = synCtx.getConfiguration().getEndpoint(endpointRef);
@@ -97,19 +85,18 @@
epr.send(synCtx);
}
}
-
}
///////////////////////////////////////////////////////////////////////////////////////
// Getters and Setters //
///////////////////////////////////////////////////////////////////////////////////////
- public String getTo() {
- return to;
+ public String getToAddress() {
+ return toAddress;
}
- public void setTo(String to) {
- this.to = to;
+ public void setToAddress(String toAddress) {
+ this.toAddress = toAddress;
}
public String getSoapAction() {
Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java Sun Jan 13 07:24:37 2008
@@ -30,83 +30,59 @@
import java.util.TimerTask;
/**
- * This holds the Aggregate properties and the list of messages which participate in the aggregation
+ * An instance of this class is created to manage each aggregation group, and it holds
+ * the aggregation properties and the messages collected during aggregation. This class also
+ * times out itself after the timeout expires it
*/
public class Aggregate extends TimerTask {
- /**
- *
- */
private static final Log log = LogFactory.getLog(Aggregate.class);
-
- /**
- *
- */
private static final Log trace = LogFactory.getLog(SynapseConstants.TRACE_LOGGER);
- /**
- *
- */
- private long timeout = 0;
-
- /**
- *
- */
- private long expireTime = 0;
-
- /**
- *
- */
+ private long timeoutMillis = 0;
+ /** The time in millis at which this aggregation should be considered as expired */
+ private long expiryTimeMillis = 0;
+ /** The minimum number of messages to be collected to consider this aggregation as complete */
private int minCount = -1;
-
- /**
- *
- */
+ /** The maximum number of messages that should be collected by this aggregation */
private int maxCount = -1;
-
- /**
- *
- */
- private String corelation = null;
-
- private AggregateMediator mediator = null;
-
- /**
- *
- */
+ private String correlation = null;
+ /** The AggregateMediator that should be invoked on completion of the aggregation */
+ private AggregateMediator aggregateMediator = null;
private List<MessageContext> messages = new ArrayList<MessageContext>();
/**
- * This is the constructor of the Aggregate which will set the timeout depending on the
- * timeout for the aggregate
+ * Save aggregation properties and timeout
*
- * @param corelation - String representing the corelation name of the messages in the aggregate
- * @param timeout -
- * @param min -
- * @param max -
- * @param mediator -
- */
- public Aggregate(String corelation, long timeout, int min, int max, AggregateMediator mediator) {
- this.corelation = corelation;
- if (timeout > 0) {
- this.timeout = System.currentTimeMillis() + expireTime;
+ * @param corelation representing the corelation name of the messages in the aggregate
+ * @param timeoutMillis the timeout duration in milliseconds
+ * @param min the minimum number of messages to be aggregated
+ * @param max the maximum number of messages to be aggregated
+ * @param mediator
+ */
+ public Aggregate(String corelation, long timeoutMillis, int min, int max, AggregateMediator mediator) {
+ this.correlation = corelation;
+ if (timeoutMillis > 0) {
+ expiryTimeMillis = System.currentTimeMillis() + timeoutMillis;
}
if (min > 0) {
- this.minCount = min;
+ minCount = min;
}
if (max > 0) {
- this.maxCount = max;
+ maxCount = max;
}
- this.mediator = mediator;
+ this.aggregateMediator = mediator;
}
/**
- * @param synCtx -
- * @return true if the message was added and false if not
+ * Add a message to the interlan message list
+ *
+ * @param synCtx message to be added into this aggregation group
+ * @return true if the message was added or false if not
*/
public boolean addMessage(MessageContext synCtx) {
- if (this.maxCount > 0 && this.messages.size() < this.maxCount || this.maxCount <= 0) {
- this.messages.add(synCtx);
+ if (maxCount <= 0 || (maxCount > 0 && messages.size() < maxCount)) {
+ messages.add(synCtx);
return true;
} else {
return false;
@@ -114,42 +90,87 @@
}
/**
- * @return boolean stating the completeness of the corelation
+ * Has this aggregation group completed?
+ *
+ * @return boolean true if aggregation is complete
*/
- public boolean isComplete() {
+ public boolean isComplete(boolean traceOn, boolean traceOrDebugOn, Log trace, Log log) {
- boolean completed = false;
+ // if any messages have been collected, check if the completion criteria is met
if (!messages.isEmpty()) {
- Object o = messages.get(0);
- if (o instanceof MessageContext) {
-
- Object prop = ((MessageContext) o).getProperty(EIPConstants.MESSAGE_SEQUENCE);
- if (prop instanceof String) {
+ // get total messages for this group, from the first message we have collected
+ MessageContext mc = messages.get(0);
+ Object prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE);
+
+ if (prop != null && prop instanceof String) {
+ String[] msgSequence = prop.toString().split(EIPConstants.MESSAGE_SEQUENCE_DELEMITER);
+ int total = Integer.parseInt(msgSequence[1]);
+
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, trace, log, messages.size() +
+ " messages of " + total + " collected in current aggregation");
+ }
- String[] msgSequence
- = prop.toString().split(EIPConstants.MESSAGE_SEQUENCE_DELEMITER);
- if (messages.size() >= Integer.parseInt(msgSequence[1])) {
- completed = true;
+ if (messages.size() >= total) {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, trace, log, "Aggregation complete");
}
+ return true;
}
}
+ } else {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, trace, log, "No messages collected in current aggregation");
+ }
}
- if (!completed && this.minCount > 0) {
- completed = this.messages.size() >= this.minCount
- || this.timeout < System.currentTimeMillis();
+ // if the minimum number of messages has been reached, its complete
+ if (minCount > 0 && messages.size() >= minCount) {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, trace, log,
+ "Aggregation complete - the minimum : " + minCount + " messages has been reached");
+ }
+ return true;
}
- return completed;
+ if (maxCount > 0 && messages.size() >= maxCount) {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, trace, log,
+ "Aggregation complete - the maximum : " + maxCount + " messages has been reached");
+ }
+
+ return true;
+ }
+
+ // else, has this aggregation reached its timeout?
+ if (System.currentTimeMillis() >= expiryTimeMillis) {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, trace, log,
+ "Aggregation complete - the aggregation has timed out");
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ private void traceOrDebug(boolean traceOn, Log trace, Log log, String msg) {
+ if (traceOn) {
+ trace.info(msg);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(msg);
+ }
}
- public long getTimeout() {
- return timeout;
+ public long getTimeoutMillis() {
+ return timeoutMillis;
}
- public void setTimeout(long timeout) {
- this.timeout = timeout;
+ public void setTimeoutMillis(long timeoutMillis) {
+ this.timeoutMillis = timeoutMillis;
}
public int getMinCount() {
@@ -168,15 +189,15 @@
this.maxCount = maxCount;
}
- public String getCorelation() {
- return corelation;
+ public String getCorrelation() {
+ return correlation;
}
- public void setCorelation(String corelation) {
- this.corelation = corelation;
+ public void setCorrelation(String correlation) {
+ this.correlation = correlation;
}
- public List getMessages() {
+ public List<MessageContext> getMessages() {
return messages;
}
@@ -184,15 +205,19 @@
this.messages = messages;
}
- public long getExpireTime() {
- return expireTime;
+ public long getExpiryTimeMillis() {
+ return expiryTimeMillis;
}
- public void setExpireTime(long expireTime) {
- this.expireTime = expireTime;
+ public void setExpiryTimeMillis(long expiryTimeMillis) {
+ this.expiryTimeMillis = expiryTimeMillis;
}
public void run() {
- mediator.completeAggregate(this);
+ if (log.isDebugEnabled()) {
+ log.debug("Time : " + System.currentTimeMillis() + " and this aggregator expired at : " +
+ expiryTimeMillis);
+ }
+ aggregateMediator.completeAggregate(this);
}
}
Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java Sun Jan 13 07:24:37 2008
@@ -25,7 +25,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
-import org.apache.synapse.SynapseException;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.mediators.AbstractMediator;
import org.apache.synapse.mediators.eip.EIPUtils;
@@ -36,95 +35,51 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Collections;
/**
- * This mediator will aggregate the messages flowing in to this with the specified message types
- * and build a one message
+ * Aggregate a number of messages that are determined to be for a particular group, and combine
+ * them to form a single message which is then processed through the 'onComplete' sequence. Thus
+ * an aggregator acts like a filter, and may look at a correlation XPath expression to select
+ * messages for aggregation - or look at messageSequence number properties for aggregation or
+ * let any other (i.e. non aggregatable) messages flow through
+ * An instance of this mediator will register with a Timer to be notified after a specified timeout,
+ * so that aggregations that never would complete could be timed out and cleared from memory and
+ * any fault conditions handled
*/
public class AggregateMediator extends AbstractMediator {
private static final Log log = LogFactory.getLog(AggregateMediator.class);
-
private static final Log trace = LogFactory.getLog(SynapseConstants.TRACE_LOGGER);
- /**
- * This will hold the maximum lifetime of an aggregate and if a particular aggregate does not
- * completed before its life time it will be invalidated and taken off from the activeAggregates
- * map and put in to the expiredAggregates map and the invalidate sequence will be called to
- * mediate the messages in the expired aggregate if there are any
- */
- private long timeToInvalidate = 0;
-
- /**
- * Messages coming to the aggregator will be examined for the existance of a node described
- * in this XPATH and if it contains the XPATH pick that, if not try to find the messageSequence
- * property for the correlation and if not pass the message through
- */
- private AXIOMXPath corelateExpression = null;
-
- /**
- * This will be used in the complete condition to complete the aggregation after waiting a
- * specified timeout and send the messages gathered in the aggregate after aggregation
- * if there are any messages
- */
- private long completeTimeout = 0;
-
- /**
- * Minimum number of messages required to evaluate the complete condition to true unless the
- * aggregate has timed out with the provided timeout if there is a one
- */
+ /** The duration as a number of milliseconds for this aggregation to complete */
+ private long completionTimeoutMillis = 0;
+ /** The minimum number of messages required to complete aggregation */
private int minMessagesToComplete = -1;
-
- /**
- * Maximum number of messages that can be contained in a particular aggregation
- */
+ /** The maximum number of messages required to complete aggregation */
private int maxMessagesToComplete = -1;
/**
- * This will hold the implementation of the aggregation algorithm and upon validating the
- * complete condition getAggregatedMessage method of the aggregator will be called to get
- * the aggregated message
- */
- private AXIOMXPath aggregationExpression = null;
-
- /**
- * Holds a String reference to the Named Sequence which will be called to mediate the invalid
- * messages coming in to the aggregator
+ * XPath that specifies a correlation expression that can be used to combine messages. An
+ * example maybe //department@id="11"
*/
- private String invalidMsgSequenceRef = null;
-
- /**
- * Sequence which will be called to mediate the invalid messages coming in to aggregator
- */
- private SequenceMediator invalidMsgSequence = null;
-
+ private AXIOMXPath correlateExpression = null;
/**
- * This will be used to destroy the aggregates which were kept in the expiredAggregates map
+ * An XPath expression that may specify a selected element to be aggregated from a group of
+ * messages to create the aggregated message
+ * e.g. //getQuote/return would pick up and aggregate the //getQuote/return elements from a
+ * bunch of matching messages into one aggregated message
*/
- private long invlidateToDestroyTime = 0;
+ private AXIOMXPath aggregationExpression = null;
- /**
- * This holds the reference sequence name of the
- */
+ /** This holds the reference sequence name of the */
private String onCompleteSequenceRef = null;
-
- /**
- *
- */
+ /** Inline sequence definition holder that holds the onComplete sequence */
private SequenceMediator onCompleteSequence = null;
- /**
- * This will hold the map of active aggregates at any given time
- */
- private Map<String, Aggregate> activeAggregates = new HashMap<String, Aggregate>();
-
- /**
- * This will hold the expired aggregates at any given time, these will be cleaned by a timer
- * task time to time in order to ensure uncontrolled growth
- */
- private Map expiredAggregates = new HashMap();
-
- private boolean isTimerSet = false;
+ /** The active aggregates currently being processd */
+ private Map<String, Aggregate> activeAggregates =
+ Collections.synchronizedMap(new HashMap<String, Aggregate>());
public AggregateMediator() {
try {
@@ -141,16 +96,14 @@
}
/**
- * This is the mediate method implementation of the AggregateMediator. And this will aggregate
- * the messages going through this mediator according to the correlation criteria and the
- * aggregation algorithm specified to it
+ * Aggregate messages flowing through this mediator according to the correlation criteria
+ * and the aggregation algorithm specified to it
*
* @param synCtx - MessageContext to be mediated and aggregated
* @return boolean true if the complete condition for the particular aggregate is validated
- * false if not
*/
public boolean mediate(MessageContext synCtx) {
- // tracing and debuggin related mediation initiation
+
boolean traceOn = isTraceOn(synCtx);
boolean traceOrDebugOn = isTraceOrDebugOn(traceOn);
@@ -162,114 +115,110 @@
}
}
-// todo: revisit this
-// if (!isTimerSet) {
-// synCtx.getConfiguration().getSynapseTimer()
-// .schedule(new AggregateCollector(this), 5000);
-// }
-
try {
Aggregate aggregate = null;
- // if the corelate aggregationExpression is provided and there is a coresponding
- // element in the message corelate the messages on that
- if (this.corelateExpression != null
- && this.corelateExpression.evaluate(synCtx.getEnvelope()) != null) {
-
- if (activeAggregates.containsKey(this.corelateExpression.toString())) {
- Object o = activeAggregates.get(this.corelateExpression.toString());
- if (o instanceof Aggregate) {
- aggregate = (Aggregate) o;
- } else {
- handleException("Undefined aggregate type.", synCtx);
- }
- } else {
- aggregate = new Aggregate(this.corelateExpression.toString(),
- this.completeTimeout, this.minMessagesToComplete,
- this.maxMessagesToComplete, this);
- synCtx.getConfiguration().getSynapseTimer().schedule(aggregate, completeTimeout);
- activeAggregates.put(this.corelateExpression.toString(), aggregate);
- }
-
- // if the corelattion can not be found using the aggregationExpression try to find the
- // corelation on the default criteria which is through the aggregate corelation
- // property of the message
- } else if (synCtx.getProperty(EIPConstants.AGGREGATE_CORELATION) != null) {
-
- String corelation = synCtx.getProperty(
- EIPConstants.AGGREGATE_CORELATION) instanceof String ? synCtx.getProperty(
- EIPConstants.AGGREGATE_CORELATION).toString() : null;
+ // if a correlateExpression is provided and there is a coresponding
+ // element in the current message prepare to correlate the messages on that
+ if (correlateExpression != null
+ && correlateExpression.evaluate(synCtx.getEnvelope()) != null) {
- // check whether the message corelation name is in the expired aggregates
- if (expiredAggregates.containsKey(corelation)) {
+ if (activeAggregates.containsKey(correlateExpression.toString())) {
+ aggregate = activeAggregates.get(correlateExpression.toString());
+ } else {
if (traceOrDebugOn) {
- traceOrDebug(traceOn, "Message with the corelation "
- + corelation + " expired. Invalidating the message.");
+ traceOrDebug(traceOn, "Creating new Aggregator - expires in : " +
+ (completionTimeoutMillis / 1000) + "secs");
}
- invalidate(synCtx, traceOrDebugOn, traceOn);
- return false;
+ aggregate = new Aggregate(
+ correlateExpression.toString(),
+ completionTimeoutMillis,
+ minMessagesToComplete,
+ maxMessagesToComplete, this);
+ synCtx.getConfiguration().getSynapseTimer().
+ schedule(aggregate, completionTimeoutMillis);
+ activeAggregates.put(correlateExpression.toString(), aggregate);
}
- if (corelation != null) {
+ } else if (synCtx.getProperty(EIPConstants.AGGREGATE_CORRELATION) != null) {
+ // if the correlattion cannot be found using the correlateExpression then
+ // try the default which is through the AGGREGATE_CORRELATION message property
+ // which is the unique original message id of a split or iterate operation and
+ // which thus can be used to uniquely group messages into aggregates
- if (activeAggregates.containsKey(corelation)) {
+ Object o = synCtx.getProperty(EIPConstants.AGGREGATE_CORRELATION);
+ String correlation = null;
- Object o = activeAggregates.get(corelation);
- if (o instanceof Aggregate) {
- aggregate = (Aggregate) o;
- } else {
- handleException("Undefined aggregate type.", synCtx);
- }
+ if (o != null && o instanceof String) {
+ correlation = (String) o;
+
+ if (activeAggregates.containsKey(correlation)) {
+ aggregate = activeAggregates.get(correlation);
} else {
- aggregate = new Aggregate(corelation, this.completeTimeout,
- this.minMessagesToComplete, this.maxMessagesToComplete, this);
- synCtx.getConfiguration().getSynapseTimer().schedule(aggregate, completeTimeout);
- activeAggregates.put(corelation, aggregate);
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, "Creating new Aggregator - expires in : " +
+ (completionTimeoutMillis / 1000) + "secs");
+ }
+
+ aggregate = new Aggregate(
+ correlation,
+ completionTimeoutMillis,
+ minMessagesToComplete,
+ maxMessagesToComplete, this);
+ synCtx.getConfiguration().getSynapseTimer().
+ schedule(aggregate, completionTimeoutMillis);
+ activeAggregates.put(correlation, aggregate);
}
} else {
if (traceOrDebugOn) {
- traceOrDebug(traceOn,
- "Error in getting corelation details. Skip the aggregator.");
+ traceOrDebug(traceOn, "Unable to find aggrgation correlation property");
}
return true;
}
} else {
if (traceOrDebugOn) {
- traceOrDebug(traceOn,
- "Unable to find the aggregation corelation. Skip the aggregation");
+ traceOrDebug(traceOn, "Unable to find aggrgation correlation XPath or property");
}
return true;
}
// if there is an aggregate continue on aggregation
if (aggregate != null) {
-
- // add the message to the aggregate and if the maximum count of the aggregate is
- // exceeded invalidate the message
- if (!aggregate.addMessage(synCtx)) {
- if (traceOrDebugOn) {
- traceOrDebug(traceOn, "Can not exceed aggregate " +
- "max message count. Invalidating message");
+ boolean collected = aggregate.addMessage(synCtx);
+ if (traceOrDebugOn) {
+ if (collected) {
+ traceOrDebug(traceOn, "Collected a message during aggregation");
+ if (traceOn && trace.isTraceEnabled()) {
+ trace.trace("Collected message : " + synCtx);
+ }
}
- invalidate(synCtx, traceOrDebugOn, traceOn);
- return false;
}
-
- // check the completeness of the aggregate and is completed aggregate the messages
+
+ // check the completeness of the aggregate and if completed aggregate the messages
// if not completed return false and block the message sequence till it completes
- if (aggregate.isComplete()) {
- return completeAggregate(aggregate);
+
+ if (aggregate.isComplete(traceOn, traceOrDebugOn, trace, log)) {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, "Aggregation completed - invoking onComplete");
+ }
+ completeAggregate(aggregate);
+
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, "End : Aggregate mediator");
+ }
+ return true;
}
- // if the aggregation corelation can not be found then continue the message on the
- // normal path by returning true
} else {
+ // if the aggregation correlation cannot be found then continue the message on the
+ // normal path by returning true
+
if (traceOrDebugOn) {
- traceOrDebug(traceOn, "Unable to find the aggregate. Skip the aggregation");
+ traceOrDebug(traceOn, "Unable to find an aggregate for this message - skip");
}
return true;
}
@@ -278,7 +227,6 @@
handleException("Unable to execute the XPATH over the message", e, synCtx);
}
- // finalize tracing and debugging
if (traceOrDebugOn) {
traceOrDebug(traceOn, "End : Aggregate mediator");
}
@@ -286,115 +234,104 @@
return false;
}
- private void invalidate(MessageContext synCtx, boolean traceOrDebugOn, boolean traceOn) {
+ /**
+ * Invoked by the Aggregate objects that are timed out, to signal timeout/completion of
+ * itself
+ * @param aggregate the timed out Aggregate that holds collected messages and properties
+ */
+ public void completeAggregate(Aggregate aggregate) {
- if (this.invalidMsgSequenceRef != null && synCtx.getConfiguration()
- .getSequence(invalidMsgSequenceRef) != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Aggregation completed or timed out");
+ }
- // use the sequence reference to get the sequence for mediation
- synCtx.getConfiguration().getSequence(invalidMsgSequenceRef).mediate(synCtx);
+ // cancel the timer
+ aggregate.cancel();
- } else if (this.invalidMsgSequence != null) {
+ MessageContext newSynCtx = getAggregatedMessage(aggregate);
+ if (newSynCtx == null) {
+ log.warn("An aggregation of messages timed out with no aggregated messages", null);
+ return;
+ }
- // use the sequence to mediate the invalidated messages
- invalidMsgSequence.mediate(synCtx);
+ activeAggregates.remove(aggregate);
- } else {
- if (traceOrDebugOn) {
- traceOrDebug(traceOn, "No invalid message sequence defined. Dropping the message");
+ if ((correlateExpression != null &&
+ !correlateExpression.toString().equals(aggregate.getCorrelation())) ||
+ correlateExpression == null) {
+
+ if (onCompleteSequence != null) {
+ onCompleteSequence.mediate(newSynCtx);
+
+ } else if (onCompleteSequenceRef != null
+ && newSynCtx.getSequence(onCompleteSequenceRef) != null) {
+ newSynCtx.getSequence(onCompleteSequenceRef).mediate(newSynCtx);
+
+ } else {
+ handleException("Unable to find the sequence for the mediation " +
+ "of the aggregated message", newSynCtx);
}
}
}
- public boolean completeAggregate(Aggregate aggregate) {
+ /**
+ * Get the aggregated message from the specified Aggregate instance
+ *
+ * @param aggregate the Aggregate object that holds collected messages and properties of the
+ * aggregation
+ * @return the aggregated message context
+ */
+ private MessageContext getAggregatedMessage(Aggregate aggregate) {
+
+ MessageContext newCtx = null;
+ Iterator<MessageContext> itr = aggregate.getMessages().iterator();
- MessageContext newSynCtx = getAggregatedMessage(aggregate);
- activeAggregates.remove(aggregate.getCorelation());
+ while (itr.hasNext()) {
+ MessageContext synCtx = itr.next();
+ if (newCtx == null) {
+ newCtx = synCtx;
- if ((this.corelateExpression != null && !this.corelateExpression
- .toString().equals(aggregate.getCorelation())) ||
- this.corelateExpression == null) {
-
-// aggregate.setExpireTime(
-// System.currentTimeMillis() + this.invlidateToDestroyTime);
- expiredAggregates.put(aggregate.getCorelation(),
- new Long(System.currentTimeMillis() + this.invlidateToDestroyTime));
-
- if (this.onCompleteSequence != null) {
- this.onCompleteSequence.mediate(newSynCtx);
- } else if (this.onCompleteSequenceRef != null
- && newSynCtx.getSequence(this.onCompleteSequenceRef) != null) {
- newSynCtx.getSequence(this.onCompleteSequenceRef).mediate(newSynCtx);
- } else {
- handleException("Unable to find the sequence for the mediation " +
- "of the aggregated message", newSynCtx);
+ if (log.isDebugEnabled()) {
+ log.debug("Generating Aggregated message from : " + newCtx.getEnvelope());
}
- return false;
+
} else {
- return true;
- }
- }
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Merging message : " + synCtx.getEnvelope() + " using XPath : " +
+ aggregationExpression);
+ }
- public MessageContext getAggregatedMessage(Aggregate aggregate) {
- MessageContext newCtx = null;
- Iterator itr = aggregate.getMessages().iterator();
- while (itr.hasNext()) {
- Object o = itr.next();
- if (o instanceof MessageContext) {
- MessageContext synCtx = (MessageContext) o;
- if (newCtx == null) {
- newCtx = synCtx;
- } else {
- try {
- EIPUtils.enrichEnvelope(
- newCtx.getEnvelope(), synCtx.getEnvelope(), this.aggregationExpression);
- } catch (JaxenException e) {
- handleException("Unable to get the aggreagated message", e, synCtx);
+ EIPUtils.enrichEnvelope(
+ newCtx.getEnvelope(), synCtx.getEnvelope(), aggregationExpression);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Merged result : " + newCtx.getEnvelope());
}
+
+ } catch (JaxenException e) {
+ handleException("Error merging aggregation results using XPath : " +
+ aggregationExpression.toString(), e, synCtx);
}
}
}
return newCtx;
}
- public AXIOMXPath getCorelateExpression() {
- return corelateExpression;
- }
-
- public void setCorelateExpression(AXIOMXPath corelateExpression) {
- this.corelateExpression = corelateExpression;
- }
-
- public String getInvalidMsgSequenceRef() {
- return invalidMsgSequenceRef;
- }
-
- public void setInvalidMsgSequenceRef(String invalidMsgSequenceRef) {
- this.invalidMsgSequenceRef = invalidMsgSequenceRef;
+ public AXIOMXPath getCorrelateExpression() {
+ return correlateExpression;
}
- public SequenceMediator getInvalidMsgSequence() {
- return invalidMsgSequence;
+ public void setCorrelateExpression(AXIOMXPath correlateExpression) {
+ this.correlateExpression = correlateExpression;
}
- public void setInvalidMsgSequence(SequenceMediator invalidMsgSequence) {
- this.invalidMsgSequence = invalidMsgSequence;
+ public long getCompletionTimeoutMillis() {
+ return completionTimeoutMillis;
}
- public long getTimeToInvalidate() {
- return timeToInvalidate;
- }
-
- public void setTimeToInvalidate(long timeToInvalidate) {
- this.timeToInvalidate = timeToInvalidate;
- }
-
- public long getCompleteTimeout() {
- return completeTimeout;
- }
-
- public void setCompleteTimeout(long completeTimeout) {
- this.completeTimeout = completeTimeout;
+ public void setCompletionTimeoutMillis(long completionTimeoutMillis) {
+ this.completionTimeoutMillis = completionTimeoutMillis;
}
public int getMinMessagesToComplete() {
@@ -421,14 +358,6 @@
this.aggregationExpression = aggregationExpression;
}
- public long getInvlidateToDestroyTime() {
- return invlidateToDestroyTime;
- }
-
- public void setInvlidateToDestroyTime(long invlidateToDestroyTime) {
- this.invlidateToDestroyTime = invlidateToDestroyTime;
- }
-
public String getOnCompleteSequenceRef() {
return onCompleteSequenceRef;
}
@@ -443,10 +372,6 @@
public void setOnCompleteSequence(SequenceMediator onCompleteSequence) {
this.onCompleteSequence = onCompleteSequence;
- }
-
- public Map getExpiredAggregates() {
- return expiredAggregates;
}
public Map getActiveAggregates() {
Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java Sun Jan 13 07:24:37 2008
@@ -20,9 +20,12 @@
package org.apache.synapse.mediators.eip.splitter;
import org.apache.synapse.MessageContext;
+import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.util.MessageHelper;
import org.apache.synapse.mediators.AbstractMediator;
+import org.apache.synapse.mediators.base.SequenceMediator;
import org.apache.synapse.mediators.eip.Target;
import org.apache.synapse.mediators.eip.EIPConstants;
import org.apache.axis2.AxisFault;
@@ -31,35 +34,34 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Iterator;
/**
- * This mediator will clone the message in to different messages and mediated as specified in the
- * target elements.
+ * This mediator will clone the message into multiple messages and mediate as specified in the
+ * target elements. A target specifies or refers to a sequence or an endpoint, and optionally
+ * specifies an Action and/or To address to be set to the cloned message. The number of cloned
+ * messages created is the number of targets specified
*/
-public class CloneMediator extends AbstractMediator {
+public class CloneMediator extends AbstractMediator implements ManagedLifecycle {
/**
- * This variable specifies whether to continue the parent message (i.e. message which is
- * subjected to cloning) or not
+ * Continue processing the parent message or not?
+ * (i.e. message which is subjected to cloning)
*/
private boolean continueParent = false;
- /**
- * Holds the list of targets to which cloned copies of the message will be given for mediation
- */
+ /** the list of targets to which cloned copies of the message will be given for mediation */
private List<Target> targets = new ArrayList<Target>();
/**
* This will implement the mediate method of the Mediator interface and will provide the
- * functionality of cloning message in to the specified targets and mediation
+ * functionality of cloning message into the specified targets and mediation
*
* @param synCtx - MessageContext which is subjected to the cloning
- * @return boolean true if this needs to be further mediated (continueParent=true) false
- * otherwise
+ * @return boolean true if this needs to be further mediated (continueParent=true)
*/
public boolean mediate(MessageContext synCtx) {
- // tracing and debuggin related mediation initiation
boolean traceOn = isTraceOn(synCtx);
boolean traceOrDebugOn = isTraceOrDebugOn(traceOn);
@@ -73,23 +75,15 @@
// get the targets list, clone the message for the number of targets and then
// mediate the cloned messages using the targets
- if (targets.size() != 0) {
-
- for (int i = 0; i < targets.size(); i++) {
- // clone message context for this target
- MessageContext newContext = getClonedMessageContext(synCtx, i, targets.size());
- Object o = targets.get(i);
-
- if (o instanceof Target) {
- Target target = (Target) o;
- target.mediate(newContext);
- }
+ Iterator<Target> iter = targets.iterator();
+ int i = 0;
+ while (iter.hasNext()) {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, "Submitting " + (i+1) + " of " + targets.size() +
+ " messages for processing in parallel");
}
- }
- // finalize tracing and debugging
- if (traceOrDebugOn) {
- traceOrDebug(traceOn, "End : Clone mediator");
+ iter.next().mediate(getClonedMessageContext(synCtx, i++, targets.size()));
}
// if the continuation of the parent message is stopped from here set the RESPONSE_WRITTEN
@@ -100,35 +94,40 @@
opCtx.setProperty(Constants.RESPONSE_WRITTEN, "SKIP");
}
+ // finalize tracing and debugging
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, "End : Clone mediator");
+ }
+
// if continue parent is true mediators after the clone will be called for the further
// mediation of the message which is subjected for clonning (parent message)
return continueParent;
}
/**
- * This private method is used to clone the MC in to a new MC
+ * clone the provided message context as a new message, and mark as the messageSequence'th
+ * message context of a total of messageCount messages
*
- * @param synCtx - MessageContext which is subjected to the clonning
- * @param messageSequence - int clonning message number
- * @param messageCount - int complete count of cloned messages
- * @return MessageContext which is cloned from the given parameters
+ * @param synCtx - MessageContext which is subjected to the cloning
+ * @param messageSequence - the position of this message of the cloned set
+ * @param messageCount - total of cloned copies
+ * @return MessageContext the cloned message context
*/
private MessageContext getClonedMessageContext(MessageContext synCtx, int messageSequence,
- int messageCount) {
+ int messageCount) {
MessageContext newCtx = null;
try {
- // clones the message context
newCtx = MessageHelper.cloneMessageContext(synCtx);
+
+ // set the property MESSAGE_SEQUENCE to the MC for aggregation purposes
+ newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE,
+ String.valueOf(messageSequence) + EIPConstants.MESSAGE_SEQUENCE_DELEMITER +
+ messageCount);
} catch (AxisFault axisFault) {
- handleException("Error creating a new message context", axisFault, synCtx);
+ handleException("Error cloning the message context", axisFault, synCtx);
}
- // Sets the property MESSAGE_SEQUENCE to the MC for aggragation purposes
- assert newCtx != null;
- newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE, String.valueOf(messageSequence)
- + EIPConstants.MESSAGE_SEQUENCE_DELEMITER + messageCount);
-
return newCtx;
}
@@ -154,6 +153,26 @@
public void addTarget(Target target) {
this.targets.add(target);
+ }
+
+ public void init(SynapseEnvironment se) {
+ Iterator<Target> iter = targets.iterator();
+ while (iter.hasNext()) {
+ SequenceMediator seq = iter.next().getSequence();
+ if (seq != null) {
+ seq.init(se);
+ }
+ }
+ }
+
+ public void destroy() {
+ Iterator<Target> iter = targets.iterator();
+ while (iter.hasNext()) {
+ SequenceMediator seq = iter.next().getSequence();
+ if (seq != null) {
+ seq.destroy();
+ }
+ }
}
}
Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java Sun Jan 13 07:24:37 2008
@@ -41,49 +41,42 @@
import java.util.Iterator;
/**
- * This mediator will split the message in the criterian specified to it and inject in to Synapse
+ * Splits a message using an XPath expression and creates a new message to hold
+ * each resulting element. This is very much similar to the clone mediator, and
+ * hands over the newly created messages to a target for processing
*/
public class IterateMediator extends AbstractMediator implements ManagedLifecycle {
- /**
- * This holds whether to continue mediation on the parent message or not
- */
+ /** Continue mediation on the parent message or not? */
private boolean continueParent = false;
/**
- * This holds whether to preserve the payload and attach the iteration child to specified node
- * or to attach the child to the body of the envelope
+ * Preserve the payload as a template to create new messages with the selected
+ * elements with the rest of the parent, or create new message that contain only
+ * the selected element as its payload?
*/
private boolean preservePayload = false;
- /**
- * This holds the expression which will be evaluated for the presence of elements in the
- * mediating message for iterations
- */
+ /** The XPath that will list the elements to be splitted */
private AXIOMXPath expression = null;
/**
- * This holds the node to which the iteration childs will be attached. This does not have any
- * meaning when the preservePayload is set to false
+ * An XPath expression that specifies where the splitted elements should be attached when
+ * the payload is being preserved
*/
private AXIOMXPath attachPath = null;
- /**
- * This holds the target object for the newly created messages by the iteration
- */
+ /** The target for the newly splitted messages */
private Target target = null;
/**
- * This method implemenents the Mediator interface and this mediator implements the message
- * splitting logic
+ * Splits the message by iterating over the results of the given XPath expression
*
* @param synCtx - MessageContext to be mediated
- * @return boolean false if need to stop processing the parent message, boolean true if further
- * processing of the parent message is required
+ * @return boolean false if need to stop processing of the parent message
*/
public boolean mediate(MessageContext synCtx) {
- // initializes the logging and tracing for the mediator
boolean traceOn = isTraceOn(synCtx);
boolean traceOrDebugOn = isTraceOrDebugOn(traceOn);
@@ -104,48 +97,44 @@
// get the iteration elements and iterate through the list,
// this call will also detach all the iteration elements
List splitElements = EIPUtils.getDetachedMatchingElements(envelope, expression);
- if (splitElements != null) {
- int msgCount = splitElements.size();
- int msgNumber = 0;
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, "Splitting with XPath : " + expression + " resulted in " +
+ splitElements.size() + " elements");
+ }
- // if not preservePayload remove all the child elements
- if (!preservePayload && envelope.getBody() != null) {
- for (Iterator itr = envelope.getBody().getChildren(); itr.hasNext();) {
- ((OMNode) itr.next()).detach();
- }
+ // if not preservePayload remove all the child elements
+ if (!preservePayload && envelope.getBody() != null) {
+ for (Iterator itr = envelope.getBody().getChildren(); itr.hasNext();) {
+ ((OMNode) itr.next()).detach();
}
+ }
- // iterate through the list
- for (Object o : splitElements) {
+ int msgCount = splitElements.size();
+ int msgNumber = 0;
- // for the moment iterator will look for an OMNode as the iteration element
- if (!(o instanceof OMNode)) {
- handleException("Error in splitting the message with expression : "
- + expression, synCtx);
- }
-
- target.mediate(
- getIteratedMessage(synCtx, msgNumber, msgCount, envelope, (OMNode) o));
- msgNumber++;
+ // iterate through the list
+ for (Object o : splitElements) {
+ // for the moment iterator will look for an OMNode as the iteration element
+ if (!(o instanceof OMNode)) {
+ handleException("Error splitting message with XPath : "
+ + expression + " - result not an OMNode", synCtx);
}
- } else {
- handleException("Splitting by expression : " + expression
- + " did not yeild in an OMElement", synCtx);
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, "Submitting " + (msgNumber+1) + " of " + msgNumber +
+ " messages for processing in parallel");
+ }
+
+ target.mediate(
+ getIteratedMessage(synCtx, msgNumber++, msgCount, envelope, (OMNode) o));
}
} catch (JaxenException e) {
- handleException("Error evaluating XPath expression : " + expression, e, synCtx);
- } catch (AxisFault axisFault) {
- handleException("Unable to split the message using the expression : " + expression,
- axisFault, synCtx);
- }
-
- // finalizing the tracing and logging on the iterate mediator
- if (traceOrDebugOn) {
- traceOrDebug(traceOn, "End : Iterate mediator");
+ handleException("Error evaluating split XPath expression : " + expression, e, synCtx);
+ } catch (AxisFault af) {
+ handleException("Error creating an iterated copy of the message", af, synCtx);
}
// if the continuation of the parent message is stopped from here set the RESPONSE_WRITTEN
@@ -156,17 +145,22 @@
opCtx.setProperty(Constants.RESPONSE_WRITTEN,"SKIP");
}
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, "End : Iterate mediator");
+ }
+
// whether to continue mediation on the original message
return continueParent;
}
/**
- * This will create a new message context with the iteration parameters
+ * Create a new message context using the given original message context, the envelope
+ * and the split result element.
*
* @param synCtx - original message context
* @param msgNumber - message number in the iteration
- * @param msgCount - message count in the iteration
- * @param envelope - cloned envelope to be used in the iteration
+ * @param msgCount - total number of messages in the split
+ * @param envelope - envelope to be used in the iteration
* @param o - element which participates in the iteration replacement
* @return newCtx created by the iteration
* @throws AxisFault if there is a message creation failure
@@ -177,9 +171,12 @@
// clone the message for the mediation in iteration
MessageContext newCtx = MessageHelper.cloneMessageContext(synCtx);
+
// set the messageSequence property for possibal aggreagtions
- newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE,
+ newCtx.setProperty(
+ EIPConstants.MESSAGE_SEQUENCE,
msgNumber + EIPConstants.MESSAGE_SEQUENCE_DELEMITER + msgCount);
+
// get a clone of the envelope to be attached
SOAPEnvelope newEnvelope = MessageHelper.cloneSOAPEnvelope(envelope);
@@ -188,24 +185,26 @@
if (preservePayload) {
Object attachElem = attachPath.evaluate(newEnvelope);
- if (attachElem instanceof List) {
+ if (attachElem != null &&
+ attachElem instanceof List && !((List) attachElem).isEmpty()) {
attachElem = ((List) attachElem).get(0);
}
// for the moment attaching element should be an OMElement
- if (attachElem instanceof OMElement) {
+ if (attachElem != null && attachElem instanceof OMElement) {
((OMElement) attachElem).addChild(o);
} else {
handleException("Error in attaching the splitted elements :: " +
"Unable to get the attach path specified by the expression " +
attachPath, synCtx);
}
- // if not preserve payload then attach the iteration element to the body
+
} else if (newEnvelope.getBody() != null) {
+ // if not preserve payload then attach the iteration element to the body
newEnvelope.getBody().addChild(o);
}
- // set the envelope ant mediate as specified in the target
+ // set the envelope and mediate as specified in the target
newCtx.setEnvelope(newEnvelope);
return newCtx;
Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/util/MessageHelper.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/util/MessageHelper.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/util/MessageHelper.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/util/MessageHelper.java Sun Jan 13 07:24:37 2008
@@ -15,7 +15,6 @@
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
-import org.wso2.throttle.ThrottleConstants;
import java.util.Iterator;
import java.util.ArrayList;
@@ -56,7 +55,7 @@
// set the parent corelation details to the cloned MC -
// for the use of aggregation like tasks
- newCtx.setProperty(EIPConstants.AGGREGATE_CORELATION, synCtx.getMessageID());
+ newCtx.setProperty(EIPConstants.AGGREGATE_CORRELATION, synCtx.getMessageID());
// copying the core parameters of the synapse MC
newCtx.setTo(synCtx.getTo());
---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org