You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by su...@apache.org on 2011/03/03 17:01:24 UTC
svn commit: r1076660 - in
/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse:
config/xml/ mediators/eip/aggregator/ mediators/eip/splitter/
Author: supun
Date: Thu Mar 3 16:01:24 2011
New Revision: 1076660
URL: http://svn.apache.org/viewvc?rev=1076660&view=rev
Log:
adding an id to have multiple levels of aggregation for the iterate clone mediators
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorSerializer.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorSerializer.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java Thu Mar 3 16:01:24 2011
@@ -70,12 +70,20 @@ public class AggregateMediatorFactory ex
= new QName(XMLConfigConstants.NULL_NAMESPACE, "max");
private static final QName SEQUENCE_Q
= new QName(XMLConfigConstants.NULL_NAMESPACE, "sequence");
+ private static final QName ID_Q
+ = new QName(XMLConfigConstants.NULL_NAMESPACE, "id");
+
public Mediator createSpecificMediator(OMElement elem, Properties properties) {
AggregateMediator mediator = new AggregateMediator();
processAuditStatus(mediator, elem);
+ OMAttribute id = elem.getAttribute(ID_Q);
+ if (id != null) {
+ mediator.setId(id.getAttributeValue());
+ }
+
OMElement corelateOn = elem.getFirstChildWithName(CORELATE_ON_Q);
if (corelateOn != null) {
OMAttribute corelateExpr = corelateOn.getAttribute(EXPRESSION_Q);
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java Thu Mar 3 16:01:24 2011
@@ -43,6 +43,10 @@ public class AggregateMediatorSerializer
OMElement aggregator = fac.createOMElement("aggregate", synNS);
saveTracingState(aggregator, mediator);
+ if (mediator.getId() != null) {
+ aggregator.addAttribute("id", mediator.getId(), nullNS);
+ }
+
if (mediator.getCorrelateExpression() != null) {
OMElement corelateOn = fac.createOMElement("correlateOn", synNS);
SynapseXPathSerializer.serializeXPath(
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorFactory.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorFactory.java Thu Mar 3 16:01:24 2011
@@ -58,6 +58,9 @@ public class CloneMediatorFactory extend
private static final QName TARGET_Q
= new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "target");
+ private static final QName ID_Q
+ = new QName(XMLConfigConstants.NULL_NAMESPACE, "id");
+
/**
* This method implements the createMediator method of the MediatorFactory interface
*
@@ -70,7 +73,13 @@ public class CloneMediatorFactory extend
CloneMediator mediator = new CloneMediator();
processAuditStatus(mediator, elem);
-
+
+ OMAttribute id = elem.getAttribute(ID_Q);
+ if (id != null) {
+ mediator.setId(id.getAttributeValue());
+ }
+
+
OMAttribute continueParent = elem.getAttribute(ATT_CONTINUE_PARENT);
if (continueParent != null) {
mediator.setContinueParent(JavaUtils.isTrueExplicitly(
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorSerializer.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorSerializer.java Thu Mar 3 16:01:24 2011
@@ -62,6 +62,10 @@ public class CloneMediatorSerializer ext
cloneElem.addAttribute("continueParent", Boolean.toString(true), nullNS);
}
+ if (clone.getId() != null) {
+ cloneElem.addAttribute("id", clone.getId(), nullNS);
+ }
+
for (Iterator itr = clone.getTargets().iterator(); itr.hasNext();) {
Object o = itr.next();
if (o instanceof Target) {
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorFactory.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorFactory.java Thu Mar 3 16:01:24 2011
@@ -65,6 +65,9 @@ public class IterateMediatorFactory exte
private static final QName ATT_ATTACHPATH = new QName("attachPath");
private static final QName ATT_SEQUENCIAL = new QName("sequential");
+ private static final QName ID_Q
+ = new QName(XMLConfigConstants.NULL_NAMESPACE, "id");
+
/**
* This method will create the IterateMediator by parsing the given xml configuration
*
@@ -77,6 +80,11 @@ public class IterateMediatorFactory exte
IterateMediator mediator = new IterateMediator();
processAuditStatus(mediator, elem);
+ OMAttribute id = elem.getAttribute(ID_Q);
+ if (id != null) {
+ mediator.setId(id.getAttributeValue());
+ }
+
OMAttribute continueParent = elem.getAttribute(ATT_CONTPAR);
if (continueParent != null) {
mediator.setContinueParent(
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorSerializer.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorSerializer.java Thu Mar 3 16:01:24 2011
@@ -67,6 +67,10 @@ public class IterateMediatorSerializer e
itrElem.addAttribute("continueParent", Boolean.toString(true), nullNS);
}
+ if (itrMed.getId() != null) {
+ itrElem.addAttribute("id", itrMed.getId(), nullNS);
+ }
+
if (itrMed.isPreservePayload()) {
itrElem.addAttribute("preservePayload", Boolean.toString(true), nullNS);
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java Thu Mar 3 16:01:24 2011
@@ -111,7 +111,8 @@ public class Aggregate extends TimerTask
// get total messages for this group, from the first message we have collected
MessageContext mc = messages.get(0);
- Object prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE);
+ Object prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE +
+ (aggregateMediator.getId() != null ? "." + aggregateMediator.getId() : ""));
if (prop != null && prop instanceof String) {
String[] msgSequence = prop.toString().split(
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java Thu Mar 3 16:01:24 2011
@@ -81,6 +81,8 @@ public class AggregateMediator extends A
private Map<String, Aggregate> activeAggregates =
Collections.synchronizedMap(new HashMap<String, Aggregate>());
+ private String id = null;
+
/** Lock object to provide the synchronized access to the activeAggregates on checking */
private final Object lock = new Object();
@@ -131,7 +133,8 @@ public class AggregateMediator extends A
try {
Aggregate aggregate = null;
-
+ String correlationIdName = (id != null ? EIPConstants.AGGREGATE_CORRELATION + "." + id :
+ EIPConstants.AGGREGATE_CORRELATION);
// if a correlateExpression is provided and there is a coresponding
// element in the current message prepare to correlate the messages on that
if (correlateExpression != null
@@ -176,13 +179,13 @@ public class AggregateMediator extends A
}
}
- } else if (synCtx.getProperty(EIPConstants.AGGREGATE_CORRELATION) != null) {
+ } else if (synCtx.getProperty(correlationIdName) != null) {
// if the correlattion cannot be found using the correlateExpression then
// try the default which is through the AGGREGATE_CORRELATION message property
// which is the unique original message id of a split or iterate operation and
// which thus can be used to uniquely group messages into aggregates
- Object o = synCtx.getProperty(EIPConstants.AGGREGATE_CORRELATION);
+ Object o = synCtx.getProperty(correlationIdName);
String correlation;
if (o != null && o instanceof String) {
@@ -436,4 +439,12 @@ public class AggregateMediator extends A
public Map getActiveAggregates() {
return activeAggregates;
}
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java Thu Mar 3 16:01:24 2011
@@ -54,6 +54,8 @@ public class CloneMediator extends Abstr
/** the list of targets to which cloned copies of the message will be given for mediation */
private List<Target> targets = new ArrayList<Target>();
+ private String id = null;
+
/**
* This will implement the mediate method of the Mediator interface and will provide the
* functionality of cloning message into the specified targets and mediation
@@ -109,19 +111,30 @@ public class CloneMediator extends Abstr
* @param synCtx - MessageContext which is subjected to the cloning
* @param messageSequence - the position of this message of the cloned set
* @param messageCount - total of cloned copies
+ *
* @return MessageContext the cloned message context
*/
private MessageContext getClonedMessageContext(MessageContext synCtx, int messageSequence,
- int messageCount) {
+ int messageCount) {
MessageContext newCtx = null;
try {
newCtx = MessageHelper.cloneMessageContext(synCtx);
- // set the property MESSAGE_SEQUENCE to the MC for aggregation purposes
- newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE,
- String.valueOf(messageSequence) + EIPConstants.MESSAGE_SEQUENCE_DELEMITER +
- messageCount);
+ if (id != null) {
+ // set the parent correlation details to the cloned MC -
+ // for the use of aggregation like tasks
+ newCtx.setProperty(EIPConstants.AGGREGATE_CORRELATION + "." + id,
+ synCtx.getMessageID());
+ // set the property MESSAGE_SEQUENCE to the MC for aggregation purposes
+ newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE + "." + id,
+ String.valueOf(messageSequence) + EIPConstants.MESSAGE_SEQUENCE_DELEMITER +
+ messageCount);
+ } else {
+ newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE,
+ String.valueOf(messageSequence) + EIPConstants.MESSAGE_SEQUENCE_DELEMITER +
+ messageCount);
+ }
} catch (AxisFault axisFault) {
handleException("Error cloning the message context", axisFault, synCtx);
}
@@ -153,6 +166,14 @@ public class CloneMediator extends Abstr
this.targets.add(target);
}
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
public void init(SynapseEnvironment se) {
for (Target target : targets) {
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java Thu Mar 3 16:01:24 2011
@@ -71,6 +71,8 @@ public class IterateMediator extends Abs
/** The target for the newly splitted messages */
private Target target = null;
+ private String id = null;
+
/**
* Splits the message by iterating over the results of the given XPath expression
*
@@ -171,10 +173,20 @@ public class IterateMediator extends Abs
// clone the message for the mediation in iteration
MessageContext newCtx = MessageHelper.cloneMessageContext(synCtx);
- // set the messageSequence property for possibal aggreagtions
- newCtx.setProperty(
- EIPConstants.MESSAGE_SEQUENCE,
- msgNumber + EIPConstants.MESSAGE_SEQUENCE_DELEMITER + msgCount);
+ if (id != null) {
+ // set the parent correlation details to the cloned MC -
+ // for the use of aggregation like tasks
+ newCtx.setProperty(EIPConstants.AGGREGATE_CORRELATION + "." + id,
+ synCtx.getMessageID());
+ // set the messageSequence property for possibal aggreagtions
+ newCtx.setProperty(
+ EIPConstants.MESSAGE_SEQUENCE + "." + id,
+ msgNumber + EIPConstants.MESSAGE_SEQUENCE_DELEMITER + msgCount);
+ } else {
+ newCtx.setProperty(
+ EIPConstants.MESSAGE_SEQUENCE,
+ msgNumber + EIPConstants.MESSAGE_SEQUENCE_DELEMITER + msgCount);
+ }
// get a clone of the envelope to be attached
SOAPEnvelope newEnvelope = MessageHelper.cloneSOAPEnvelope(envelope);
@@ -253,6 +265,14 @@ public class IterateMediator extends Abs
this.target = target;
}
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
public void init(SynapseEnvironment se) {
if (target != null) {
Endpoint endpoint = target.getEndpoint();