You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by su...@apache.org on 2011/03/03 17:01:24 UTC

svn commit: r1076660 - in /synapse/trunk/java/modules/core/src/main/java/org/apache/synapse: config/xml/ mediators/eip/aggregator/ mediators/eip/splitter/

Author: supun
Date: Thu Mar  3 16:01:24 2011
New Revision: 1076660

URL: http://svn.apache.org/viewvc?rev=1076660&view=rev
Log:
adding an id to have multiple levels of aggregation for the iterate clone mediators

Modified:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorSerializer.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorSerializer.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java Thu Mar  3 16:01:24 2011
@@ -70,12 +70,20 @@ public class AggregateMediatorFactory ex
             = new QName(XMLConfigConstants.NULL_NAMESPACE, "max");
     private static final QName SEQUENCE_Q
             = new QName(XMLConfigConstants.NULL_NAMESPACE, "sequence");
+    private static final QName ID_Q
+            = new QName(XMLConfigConstants.NULL_NAMESPACE, "id");
+
 
     public Mediator createSpecificMediator(OMElement elem, Properties properties) {
 
         AggregateMediator mediator = new AggregateMediator();
         processAuditStatus(mediator, elem);
 
+        OMAttribute id = elem.getAttribute(ID_Q);
+        if (id != null) {
+            mediator.setId(id.getAttributeValue());
+        }
+
         OMElement corelateOn = elem.getFirstChildWithName(CORELATE_ON_Q);
         if (corelateOn != null) {
             OMAttribute corelateExpr = corelateOn.getAttribute(EXPRESSION_Q);

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java Thu Mar  3 16:01:24 2011
@@ -43,6 +43,10 @@ public class AggregateMediatorSerializer
         OMElement aggregator = fac.createOMElement("aggregate", synNS);
         saveTracingState(aggregator, mediator);
 
+        if (mediator.getId() != null) {
+            aggregator.addAttribute("id", mediator.getId(), nullNS);
+        }
+
         if (mediator.getCorrelateExpression() != null) {
             OMElement corelateOn = fac.createOMElement("correlateOn", synNS);
             SynapseXPathSerializer.serializeXPath(

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorFactory.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorFactory.java Thu Mar  3 16:01:24 2011
@@ -58,6 +58,9 @@ public class CloneMediatorFactory extend
     private static final QName TARGET_Q
             = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "target");
 
+    private static final QName ID_Q
+            = new QName(XMLConfigConstants.NULL_NAMESPACE, "id");
+
     /**
      * This method implements the createMediator method of the MediatorFactory interface
      * 
@@ -70,7 +73,13 @@ public class CloneMediatorFactory extend
 
         CloneMediator mediator = new CloneMediator();
         processAuditStatus(mediator, elem);
-        
+
+        OMAttribute id = elem.getAttribute(ID_Q);
+        if (id != null) {
+            mediator.setId(id.getAttributeValue());
+        }
+
+
         OMAttribute continueParent = elem.getAttribute(ATT_CONTINUE_PARENT);
         if (continueParent != null) {
             mediator.setContinueParent(JavaUtils.isTrueExplicitly(

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorSerializer.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CloneMediatorSerializer.java Thu Mar  3 16:01:24 2011
@@ -62,6 +62,10 @@ public class CloneMediatorSerializer ext
             cloneElem.addAttribute("continueParent", Boolean.toString(true), nullNS);
         }
 
+        if (clone.getId() != null) {
+            cloneElem.addAttribute("id", clone.getId(), nullNS);
+        }
+
         for (Iterator itr = clone.getTargets().iterator(); itr.hasNext();) {
             Object o = itr.next();
             if (o instanceof Target) {

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorFactory.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorFactory.java Thu Mar  3 16:01:24 2011
@@ -65,6 +65,9 @@ public class IterateMediatorFactory exte
     private static final QName ATT_ATTACHPATH = new QName("attachPath");
     private static final QName ATT_SEQUENCIAL = new QName("sequential");
 
+    private static final QName ID_Q
+            = new QName(XMLConfigConstants.NULL_NAMESPACE, "id");
+
     /**
      * This method will create the IterateMediator by parsing the given xml configuration
      *
@@ -77,6 +80,11 @@ public class IterateMediatorFactory exte
         IterateMediator mediator = new IterateMediator();
         processAuditStatus(mediator, elem);
 
+        OMAttribute id = elem.getAttribute(ID_Q);
+        if (id != null) {
+            mediator.setId(id.getAttributeValue());
+        }
+
         OMAttribute continueParent = elem.getAttribute(ATT_CONTPAR);
         if (continueParent != null) {
             mediator.setContinueParent(

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorSerializer.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/IterateMediatorSerializer.java Thu Mar  3 16:01:24 2011
@@ -67,6 +67,10 @@ public class IterateMediatorSerializer e
             itrElem.addAttribute("continueParent", Boolean.toString(true), nullNS);
         }
 
+        if (itrMed.getId() != null) {
+            itrElem.addAttribute("id", itrMed.getId(), nullNS);
+        }
+
         if (itrMed.isPreservePayload()) {
             itrElem.addAttribute("preservePayload", Boolean.toString(true), nullNS);
         }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java Thu Mar  3 16:01:24 2011
@@ -111,7 +111,8 @@ public class Aggregate extends TimerTask
 
                 // get total messages for this group, from the first message we have collected
                 MessageContext mc = messages.get(0);
-                Object prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE);
+                Object prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE +
+                        (aggregateMediator.getId() != null ? "." + aggregateMediator.getId() : ""));
             
                 if (prop != null && prop instanceof String) {
                     String[] msgSequence = prop.toString().split(

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java Thu Mar  3 16:01:24 2011
@@ -81,6 +81,8 @@ public class AggregateMediator extends A
     private Map<String, Aggregate> activeAggregates =
         Collections.synchronizedMap(new HashMap<String, Aggregate>());
 
+    private String id = null;
+
     /** Lock object to provide the synchronized access to the activeAggregates on checking */
     private final Object lock = new Object();
 
@@ -131,7 +133,8 @@ public class AggregateMediator extends A
 
         try {
             Aggregate aggregate = null;
-
+            String correlationIdName = (id != null ? EIPConstants.AGGREGATE_CORRELATION + "." + id :
+                    EIPConstants.AGGREGATE_CORRELATION);
             // if a correlateExpression is provided and there is a coresponding
             // element in the current message prepare to correlate the messages on that
             if (correlateExpression != null
@@ -176,13 +179,13 @@ public class AggregateMediator extends A
                     }
                 }
 
-            } else if (synCtx.getProperty(EIPConstants.AGGREGATE_CORRELATION) != null) {
+            } else if (synCtx.getProperty(correlationIdName) != null) {
                 // if the correlattion cannot be found using the correlateExpression then
                 // try the default which is through the AGGREGATE_CORRELATION message property
                 // which is the unique original message id of a split or iterate operation and
                 // which thus can be used to uniquely group messages into aggregates
 
-                Object o = synCtx.getProperty(EIPConstants.AGGREGATE_CORRELATION);
+                Object o = synCtx.getProperty(correlationIdName);
                 String correlation;
 
                 if (o != null && o instanceof String) {
@@ -436,4 +439,12 @@ public class AggregateMediator extends A
     public Map getActiveAggregates() {
         return activeAggregates;
     }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
 }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java Thu Mar  3 16:01:24 2011
@@ -54,6 +54,8 @@ public class CloneMediator extends Abstr
     /** the list of targets to which cloned copies of the message will be given for mediation */
     private List<Target> targets = new ArrayList<Target>();
 
+    private String id = null;
+
     /**
      * This will implement the mediate method of the Mediator interface and will provide the
      * functionality of cloning message into the specified targets and mediation
@@ -109,19 +111,30 @@ public class CloneMediator extends Abstr
      * @param synCtx          - MessageContext which is subjected to the cloning
      * @param messageSequence - the position of this message of the cloned set
      * @param messageCount    - total of cloned copies
+     *
      * @return MessageContext the cloned message context
      */
     private MessageContext getClonedMessageContext(MessageContext synCtx, int messageSequence,
-        int messageCount) {
+                                                   int messageCount) {
 
         MessageContext newCtx = null;
         try {
             newCtx = MessageHelper.cloneMessageContext(synCtx);
 
-            // set the property MESSAGE_SEQUENCE to the MC for aggregation purposes
-            newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE,
-                String.valueOf(messageSequence) + EIPConstants.MESSAGE_SEQUENCE_DELEMITER +
-                messageCount);            
+            if (id != null) {
+                // set the parent correlation details to the cloned MC -
+                //                              for the use of aggregation like tasks
+                newCtx.setProperty(EIPConstants.AGGREGATE_CORRELATION + "." + id,
+                        synCtx.getMessageID());
+                // set the property MESSAGE_SEQUENCE to the MC for aggregation purposes
+                newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE + "." + id,
+                        String.valueOf(messageSequence) + EIPConstants.MESSAGE_SEQUENCE_DELEMITER +
+                                messageCount);
+            } else {
+                newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE,
+                        String.valueOf(messageSequence) + EIPConstants.MESSAGE_SEQUENCE_DELEMITER +
+                                messageCount);
+            }
         } catch (AxisFault axisFault) {
             handleException("Error cloning the message context", axisFault, synCtx);
         }
@@ -153,6 +166,14 @@ public class CloneMediator extends Abstr
         this.targets.add(target);
     }
 
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
     public void init(SynapseEnvironment se) {
 
         for (Target target : targets) {

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java?rev=1076660&r1=1076659&r2=1076660&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java Thu Mar  3 16:01:24 2011
@@ -71,6 +71,8 @@ public class IterateMediator extends Abs
     /** The target for the newly splitted messages */
     private Target target = null;
 
+    private String id = null;
+
     /**
      * Splits the message by iterating over the results of the given XPath expression
      *
@@ -171,10 +173,20 @@ public class IterateMediator extends Abs
         // clone the message for the mediation in iteration
         MessageContext newCtx = MessageHelper.cloneMessageContext(synCtx);
 
-        // set the messageSequence property for possibal aggreagtions
-        newCtx.setProperty(
-            EIPConstants.MESSAGE_SEQUENCE,
-            msgNumber + EIPConstants.MESSAGE_SEQUENCE_DELEMITER + msgCount);
+        if (id != null) {
+            // set the parent correlation details to the cloned MC -
+            //                              for the use of aggregation like tasks
+            newCtx.setProperty(EIPConstants.AGGREGATE_CORRELATION + "." + id,
+                    synCtx.getMessageID());
+            // set the messageSequence property for possibal aggreagtions
+            newCtx.setProperty(
+                    EIPConstants.MESSAGE_SEQUENCE + "." + id,
+                    msgNumber + EIPConstants.MESSAGE_SEQUENCE_DELEMITER + msgCount);
+        } else {
+            newCtx.setProperty(
+                    EIPConstants.MESSAGE_SEQUENCE,
+                    msgNumber + EIPConstants.MESSAGE_SEQUENCE_DELEMITER + msgCount);
+        }
 
         // get a clone of the envelope to be attached
         SOAPEnvelope newEnvelope = MessageHelper.cloneSOAPEnvelope(envelope);
@@ -253,6 +265,14 @@ public class IterateMediator extends Abs
         this.target = target;
     }
 
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
     public void init(SynapseEnvironment se) {
         if (target != null) {
             Endpoint endpoint = target.getEndpoint();