You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by as...@apache.org on 2008/01/13 16:24:37 UTC

svn commit: r611582 - in /webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse: config/xml/ mediators/ mediators/eip/ mediators/eip/aggregator/ mediators/eip/splitter/ util/

Author: asankha
Date: Sun Jan 13 07:24:37 2008
New Revision: 611582

URL: http://svn.apache.org/viewvc?rev=611582&view=rev
Log:
fix issues with the aggrgate mediator and review code and fix issues with EIP mediators package

Modified:
    webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
    webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
    webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetFactory.java
    webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetSerializer.java
    webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
    webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPConstants.java
    webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java
    webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/Target.java
    webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
    webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
    webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java
    webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java
    webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/util/MessageHelper.java

Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java Sun Jan 13 07:24:37 2008
@@ -20,7 +20,6 @@
 package org.apache.synapse.config.xml;
 
 import org.apache.synapse.Mediator;
-import org.apache.synapse.SynapseException;
 import org.apache.synapse.mediators.eip.aggregator.AggregateMediator;
 import org.apache.synapse.mediators.builtin.DropMediator;
 import org.apache.synapse.mediators.base.SequenceMediator;
@@ -58,9 +57,6 @@
     private static final QName MESSAGE_COUNT_Q
             = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "messageCount");
     private static final QName ON_COMPLETE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "onComplete");
-    private static final QName INVALIDATE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "invalidate");
-
-    private static final QName TIME_TO_LIVE_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "timeToLive");
     private static final QName EXPRESSION_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "expression");
     private static final QName TIMEOUT_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "timeout");
     private static final QName MIN_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "min");
@@ -71,11 +67,6 @@
 
         AggregateMediator mediator = new AggregateMediator();
         processTraceState(mediator, elem);
-        // todo: need to fix
-        OMAttribute timeToLive = elem.getAttribute(TIME_TO_LIVE_Q);
-        if (timeToLive != null) {
-            mediator.setTimeToInvalidate(Long.parseLong(timeToLive.getAttributeValue()) * 1000);
-        }
 
         OMElement corelateOn = elem.getFirstChildWithName(CORELATE_ON_Q);
         if (corelateOn != null) {
@@ -84,7 +75,7 @@
                 try {
                     AXIOMXPath xp = new AXIOMXPath(corelateExpr.getAttributeValue());
                     OMElementUtils.addNameSpaces(xp, corelateOn, log);
-                    mediator.setCorelateExpression(xp);
+                    mediator.setCorrelateExpression(xp);
                 } catch (JaxenException e) {
                     handleException("Unable to load the corelate XPATH expression", e);
                 }
@@ -95,7 +86,7 @@
         if (completeCond != null) {
             OMAttribute completeTimeout = completeCond.getAttribute(TIMEOUT_Q);
             if (completeTimeout != null) {
-                mediator.setCompleteTimeout(
+                mediator.setCompletionTimeoutMillis(
                         Long.parseLong(completeTimeout.getAttributeValue()) * 1000);
             }
 
@@ -110,24 +101,6 @@
                 if (max != null) {
                     mediator.setMaxMessagesToComplete(Integer.parseInt(max.getAttributeValue()));
                 }
-            }
-        }
-
-        OMElement invalidate = elem.getFirstChildWithName(INVALIDATE_Q);
-        if (invalidate != null) {
-            OMAttribute sequenceRef = invalidate.getAttribute(SEQUENCE_Q);
-            if (sequenceRef != null) {
-                mediator.setInvalidMsgSequenceRef(sequenceRef.getAttributeValue());
-            } else if (invalidate.getFirstElement() != null) {
-                mediator.setInvalidMsgSequence(
-                        (new SequenceMediatorFactory()).createAnonymousSequence(invalidate));
-            }
-
-            OMAttribute timeout = invalidate.getAttribute(TIMEOUT_Q);
-            if (timeout != null) {
-                mediator.setInvlidateToDestroyTime(Long.parseLong(timeout.getAttributeValue()));
-            } else {
-                mediator.setInvlidateToDestroyTime(300);
             }
         }
 

Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java Sun Jan 13 07:24:37 2008
@@ -20,11 +20,8 @@
 package org.apache.synapse.config.xml;
 
 import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMAttribute;
 import org.apache.synapse.Mediator;
-import org.apache.synapse.SynapseException;
 import org.apache.synapse.mediators.eip.aggregator.AggregateMediator;
-import org.apache.synapse.mediators.ext.ClassMediator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -55,16 +52,16 @@
         OMElement aggregator = fac.createOMElement("aggregate", synNS);
         saveTracingState(aggregator, mediator);
 
-        if (mediator.getCorelateExpression() != null) {
+        if (mediator.getCorrelateExpression() != null) {
             OMElement corelateOn = fac.createOMElement("corelateOn", synNS);
-            corelateOn.addAttribute("expression", mediator.getCorelateExpression().toString(), nullNS);
-            super.serializeNamespaces(corelateOn, mediator.getCorelateExpression());
+            corelateOn.addAttribute("expression", mediator.getCorrelateExpression().toString(), nullNS);
+            super.serializeNamespaces(corelateOn, mediator.getCorrelateExpression());
             aggregator.addChild(corelateOn);
         }
 
         OMElement completeCond = fac.createOMElement("completeCondition", synNS);
-        if (mediator.getCompleteTimeout() != 0) {
-            completeCond.addAttribute("timeout", Long.toString(mediator.getCompleteTimeout()), nullNS);
+        if (mediator.getCompletionTimeoutMillis() != 0) {
+            completeCond.addAttribute("timeout", Long.toString(mediator.getCompletionTimeoutMillis()), nullNS);
         }
         OMElement messageCount = fac.createOMElement("messageCount", synNS);
         if (mediator.getMinMessagesToComplete() != 0) {
@@ -88,16 +85,6 @@
                     onCompleteElem, mediator.getOnCompleteSequence().getList());
         }
         aggregator.addChild(onCompleteElem);
-
-        OMElement invalidateElem = fac.createOMElement("invalidate", synNS);
-        invalidateElem.addAttribute("timeout", Long.toString(mediator.getInvlidateToDestroyTime()), nullNS);
-        if (mediator.getInvalidMsgSequenceRef() != null) {
-            invalidateElem.addAttribute("sequence", mediator.getInvalidMsgSequenceRef(), nullNS);
-        } else if (mediator.getInvalidMsgSequence() != null) {
-            new SequenceMediatorSerializer().serializeChildren(
-                    invalidateElem, mediator.getInvalidMsgSequence().getList());
-        }
-        aggregator.addChild(invalidateElem);
 
         if (parent != null) {
             parent.addChild(aggregator);

Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetFactory.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetFactory.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetFactory.java Sun Jan 13 07:24:37 2008
@@ -64,7 +64,7 @@
         Target target = new Target();
         OMAttribute toAttr = elem.getAttribute(new QName(XMLConfigConstants.NULL_NAMESPACE, "to"));
         if (toAttr != null && toAttr.getAttributeValue() != null) {
-            target.setTo(toAttr.getAttributeValue());
+            target.setToAddress(toAttr.getAttributeValue());
         }
 
         OMAttribute soapAction = elem.getAttribute(

Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetSerializer.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetSerializer.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetSerializer.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/config/xml/TargetSerializer.java Sun Jan 13 07:24:37 2008
@@ -61,8 +61,8 @@
     public static OMElement serializeTarget(Target target) {
 
         OMElement targetElem = fac.createOMElement("target", synNS);
-        if (target.getTo() != null) {
-            targetElem.addAttribute("to", target.getTo(), nullNS);
+        if (target.getToAddress() != null) {
+            targetElem.addAttribute("to", target.getToAddress(), nullNS);
         }
 
         if (target.getSoapAction() != null) {

Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java Sun Jan 13 07:24:37 2008
@@ -68,7 +68,7 @@
     public void run() {
         try {
             seq.mediate(synCtx);
-            ((Axis2MessageContext)synCtx).getAxis2MessageContext().getEnvelope().discard();
+            //((Axis2MessageContext)synCtx).getAxis2MessageContext().getEnvelope().discard();
 
         } catch (SynapseException syne) {
             if (!synCtx.getFaultStack().isEmpty()) {
@@ -81,7 +81,7 @@
             }
 
         } catch (Exception e) {
-            String msg = "Unexpected error executing task";
+            String msg = "Unexpected error executing task/async inject";
             log.error(msg, e);
             if (synCtx.getServiceLog() != null) {
                 synCtx.getServiceLog().error(msg, e);

Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPConstants.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPConstants.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPConstants.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPConstants.java Sun Jan 13 07:24:37 2008
@@ -19,11 +19,13 @@
 
 package org.apache.synapse.mediators.eip;
 
-/** Holds all the constants related to the eip mediators */
+/** Constants related to the EIP mediators */
 public final class EIPConstants {
 
-    /** Constant for the correlation property key */
-    public static final String AGGREGATE_CORELATION = "aggregateCorelation";
+    /** Typically the message ID of the parent message in a split/iterate etc so that
+     * its children could be uniquely aggregated by the aggrgate mediator etc
+     */
+    public static final String AGGREGATE_CORRELATION = "aggregateCorelation";
 
     /** Constant for the message sequence property key */
     public static final String MESSAGE_SEQUENCE = "messageSequence";

Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java Sun Jan 13 07:24:37 2008
@@ -32,23 +32,19 @@
 import java.util.List;
 
 /**
- * Utility methods for the EIP implementations
+ * Utility methods for the EIP mediators
  */
 public class EIPUtils {
 
-    /**
-     * This will be used for logging purposes
-     */
     private static final Log log = LogFactory.getLog(EIPUtils.class);
 
     /**
-     * This static util method will be used to extract out the set of all elements described by the
-     * given XPath over the given SOAPEnvelope
+     * Return the set of elements specified by the XPath over the given envelope
      *
-     * @param envelope   SOAPEnvelope from which the the elements will be extracted
-     * @param expression AXIOMXPath expression describing the elements
-     * @return List of OMElements in the envelope matching the expression
-     * @throws JaxenException if the XPath expression evaluation fails for some reason
+     * @param envelope SOAPEnvelope from which the elements will be extracted
+     * @param expression AXIOMXPath expression describing the elements to be extracted
+     * @return List OMElements in the envelope matching the expression
+     * @throws JaxenException if the XPath expression evaluation fails
      */
     public static List getMatchingElements(SOAPEnvelope envelope, AXIOMXPath expression)
         throws JaxenException {
@@ -61,15 +57,17 @@
         } else if (o instanceof List) {
             return (List) o;
         } else {
-            return null;
+            return new ArrayList();
         }
     }
 
     /**
-     * @param envelope
-     * @param expression
-     * @return
-     * @throws JaxenException
+     * Return the set of detached elements specified by the XPath over the given envelope
+     *
+     * @param envelope SOAPEnvelope from which the elements will be extracted
+     * @param expression AXIOMXPath expression describing the elements to be extracted
+     * @return List detached OMElements in the envelope matching the expression
+     * @throws JaxenException if the XPath expression evaluation fails
      */
     public static List getDetachedMatchingElements(SOAPEnvelope envelope, AXIOMXPath expression)
         throws JaxenException {
@@ -90,8 +88,8 @@
     }
 
     /**
-     * This static util method will be used to enrich the envelope passed, by the element described
-     * by the XPath over the enricher envelope
+     * Merge two SOAP envelopes using the given XPath expression that specifies the
+     * element that enriches the first envelope from the second
      *
      * @param envelope   SOAPEnvelope to be enriched with the content
      * @param enricher   SOAPEnvelope from which the enriching element will be extracted
@@ -99,12 +97,18 @@
      */
     public static void enrichEnvelope(SOAPEnvelope envelope, SOAPEnvelope enricher,
         AXIOMXPath expression) throws JaxenException {
+
         OMElement enrichingElement;
-        Object o = getMatchingElements(envelope, expression);
-        if (o != null && o instanceof List && !((List) o).isEmpty()) {
-            o = ((List) o).get(0);
+        List elementList = getMatchingElements(envelope, expression);
+
+        if (elementList != null && !elementList.isEmpty()) {
+
+            // attach at parent of the first result from the XPath, or to the SOAPBody
+            Object o = elementList.get(0);
 
-            if (o instanceof OMElement && ((OMElement) o).getParent() instanceof OMElement) {
+            if (o instanceof OMElement &&
+                ((OMElement) o).getParent() != null &&
+                ((OMElement) o).getParent() instanceof OMElement) {
                 enrichingElement = (OMElement) ((OMElement) o).getParent();
             } else {
                 enrichingElement = envelope.getBody();

Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/Target.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/Target.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/Target.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/Target.java Sun Jan 13 07:24:37 2008
@@ -19,49 +19,37 @@
 
 package org.apache.synapse.mediators.eip;
 
-import org.apache.synapse.Mediator;
 import org.apache.synapse.MessageContext;
 import org.apache.synapse.endpoints.Endpoint;
 import org.apache.synapse.mediators.base.SequenceMediator;
 import org.apache.axis2.addressing.EndpointReference;
 
 /**
- * This class will be a bean which carries the target information for most of the EIP mediators
+ * A bean class that holds the target (i.e. sequence or endpoint) information for a message
+ * as used by common EIP mediators
  */
 public class Target {
 
-    /**
-     * Holds the to address of the target endpoint
-     */
-    private String to = null;
+    /** An optional To address to be set on the message when handing over to the target */
+    private String toAddress = null;
 
-    /**
-     * Holds the soapAction of the target service
-     */
+    /** An optional Action to be set on the message when handing over to the target */
     private String soapAction = null;
 
-    /**
-     * Holds the target mediation sequence as an anonymous sequence
-     */
+    /** The inlined target sequence definition */
     private SequenceMediator sequence = null;
 
-    /**
-     * Holds the target mediation sequence as a sequence reference
-     */
+    /** The target sequence reference key */
     private String sequenceRef = null;
 
-    /**
-     * Holds the target endpoint to which the message will be sent
-     */
+    /** The inlined target endpoint definition */
     private Endpoint endpoint = null;
 
-    /**
-     * Holds the reference to the target endpoint to which the message will be sent
-     */
+    /** The target endpoint reference key */
     private String endpointRef = null;
 
     /**
-     * This method will be called by the EIP mediators to mediated the target (may be to mediate
+     * process the message through this target (may be to mediate
      * using the target sequence, send message to the target endpoint or both)
      *
      * @param synCtx - MessageContext to be mediated
@@ -72,14 +60,16 @@
             synCtx.setSoapAction(soapAction);
         }
 
-        if (to != null) {
+        if (toAddress != null) {
             if (synCtx.getTo() != null) {
-                synCtx.getTo().setAddress(to);
+                synCtx.getTo().setAddress(toAddress);
             } else {
-                synCtx.setTo(new EndpointReference(to));
+                synCtx.setTo(new EndpointReference(toAddress));
             }
         }
 
+        // since we are injecting the new messages asynchronously, we cannot process a message
+        // through a sequence and then again with an endpoint
         if (sequence != null) {
             synCtx.getEnvironment().injectAsync(synCtx, sequence);
         } else if (sequenceRef != null) {
@@ -87,9 +77,7 @@
             if (refSequence != null) {
                 synCtx.getEnvironment().injectAsync(synCtx, refSequence);
             }
-        }
-
-        if (endpoint != null) {
+        } else if (endpoint != null) {
             endpoint.send(synCtx);
         } else if (endpointRef != null) {
             Endpoint epr = synCtx.getConfiguration().getEndpoint(endpointRef);
@@ -97,19 +85,18 @@
                 epr.send(synCtx);
             }
         }
-
     }
 
     ///////////////////////////////////////////////////////////////////////////////////////
     //                        Getters and Setters                                        //
     ///////////////////////////////////////////////////////////////////////////////////////
 
-    public String getTo() {
-        return to;
+    public String getToAddress() {
+        return toAddress;
     }
 
-    public void setTo(String to) {
-        this.to = to;
+    public void setToAddress(String toAddress) {
+        this.toAddress = toAddress;
     }
 
     public String getSoapAction() {

Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java Sun Jan 13 07:24:37 2008
@@ -30,83 +30,59 @@
 import java.util.TimerTask;
 
 /**
- * This holds the Aggregate properties and the list of messages which participate in the aggregation
+ * An instance of this class is created to manage each aggregation group, and it holds
+ * the aggregation properties and the messages collected during aggregation. This class also
+ * times out itself after the timeout expires it
  */
 public class Aggregate extends TimerTask {
 
-    /**
-     *
-     */
     private static final Log log = LogFactory.getLog(Aggregate.class);
-
-    /**
-     *
-     */
     private static final Log trace = LogFactory.getLog(SynapseConstants.TRACE_LOGGER);
 
-    /**
-     *
-     */
-    private long timeout = 0;
-
-    /**
-     *
-     */
-    private long expireTime = 0;
-
-    /**
-     *
-     */
+    private long timeoutMillis = 0;
+    /** The time in millis at which this aggregation should be considered as expired */
+    private long expiryTimeMillis = 0;
+    /** The minimum number of messages to be collected to consider this aggregation as complete */
     private int minCount = -1;
-
-    /**
-     *
-     */
+    /** The maximum number of messages that should be collected by this aggregation */
     private int maxCount = -1;
-
-    /**
-     *
-     */
-    private String corelation = null;
-
-    private AggregateMediator mediator = null;
-
-    /**
-     *
-     */
+    private String correlation = null;
+    /** The AggregateMediator that should be invoked on completion of the aggregation */
+    private AggregateMediator aggregateMediator = null;
     private List<MessageContext> messages = new ArrayList<MessageContext>();
 
     /**
-     * This is the constructor of the Aggregate which will set the timeout depending on the
-     * timeout for the aggregate
+     * Save aggregation properties and timeout
      *
-     * @param corelation - String representing the corelation name of the messages in the aggregate
-     * @param timeout -
-     * @param min -
-     * @param max -
-     * @param mediator -
-     */
-    public Aggregate(String corelation, long timeout, int min, int max, AggregateMediator mediator) {
-        this.corelation = corelation;
-        if (timeout > 0) {
-            this.timeout = System.currentTimeMillis() + expireTime;
+     * @param corelation representing the corelation name of the messages in the aggregate
+     * @param timeoutMillis the timeout duration in milliseconds
+     * @param min the minimum number of messages to be aggregated
+     * @param max the maximum number of messages to be aggregated
+     * @param mediator
+     */
+    public Aggregate(String corelation, long timeoutMillis, int min, int max, AggregateMediator mediator) {
+        this.correlation = corelation;
+        if (timeoutMillis > 0) {
+            expiryTimeMillis = System.currentTimeMillis() + timeoutMillis;
         }
         if (min > 0) {
-            this.minCount = min;
+            minCount = min;
         }
         if (max > 0) {
-            this.maxCount = max;
+            maxCount = max;
         }
-        this.mediator = mediator;
+        this.aggregateMediator = mediator;
     }
 
     /**
-     * @param synCtx -
-     * @return true if the message was added and false if not
+     * Add a message to the interlan message list
+     *
+     * @param synCtx message to be added into this aggregation group
+     * @return true if the message was added or false if not
      */
     public boolean addMessage(MessageContext synCtx) {
-        if (this.maxCount > 0 && this.messages.size() < this.maxCount || this.maxCount <= 0) {
-            this.messages.add(synCtx);
+        if (maxCount <= 0 || (maxCount > 0 && messages.size() < maxCount)) {
+            messages.add(synCtx);
             return true;
         } else {
             return false;
@@ -114,42 +90,87 @@
     }
 
     /**
-     * @return boolean stating the completeness of the corelation
+     * Has this aggregation group completed?
+     *
+     * @return boolean true if aggregation is complete
      */
-    public boolean isComplete() {
+    public boolean isComplete(boolean traceOn, boolean traceOrDebugOn, Log trace, Log log) {
 
-        boolean completed = false;
+        // if any messages have been collected, check if the completion criteria is met
         if (!messages.isEmpty()) {
 
-            Object o = messages.get(0);
-            if (o instanceof MessageContext) {
-
-                Object prop = ((MessageContext) o).getProperty(EIPConstants.MESSAGE_SEQUENCE);
-                if (prop instanceof String) {
+            // get total messages for this group, from the first message we have collected
+            MessageContext mc = messages.get(0);
+            Object prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE);
+            
+            if (prop != null && prop instanceof String) {
+                String[] msgSequence = prop.toString().split(EIPConstants.MESSAGE_SEQUENCE_DELEMITER);
+                int total = Integer.parseInt(msgSequence[1]);
+
+                if (traceOrDebugOn) {
+                    traceOrDebug(traceOn, trace, log, messages.size() +
+                        " messages of " + total + " collected in current aggregation");
+                }
 
-                    String[] msgSequence
-                            = prop.toString().split(EIPConstants.MESSAGE_SEQUENCE_DELEMITER);
-                    if (messages.size() >= Integer.parseInt(msgSequence[1])) {
-                        completed = true;
+                if (messages.size() >= total) {
+                    if (traceOrDebugOn) {
+                        traceOrDebug(traceOn, trace, log, "Aggregation complete");
                     }
+                    return true;
                 }
             }
+        } else {
+            if (traceOrDebugOn) {
+                traceOrDebug(traceOn, trace, log, "No messages collected in current aggregation");
+            }
         }
 
-        if (!completed && this.minCount > 0) {
-            completed = this.messages.size() >= this.minCount
-                    || this.timeout < System.currentTimeMillis();
+        // if the minimum number of messages has been reached, its complete
+        if (minCount > 0 && messages.size() >= minCount) {
+            if (traceOrDebugOn) {
+                traceOrDebug(traceOn, trace, log,
+                    "Aggregation complete - the minimum : " + minCount + " messages has been reached");
+            }
+            return true;
         }
 
-        return completed;
+        if (maxCount > 0 && messages.size() >= maxCount) {
+            if (traceOrDebugOn) {
+                traceOrDebug(traceOn, trace, log,
+                    "Aggregation complete - the maximum : " + maxCount + " messages has been reached");
+            }
+
+            return true;
+        }
+
+        // else, has this aggregation reached its timeout?
+        if (System.currentTimeMillis() >= expiryTimeMillis) {
+            if (traceOrDebugOn) {
+                traceOrDebug(traceOn, trace, log,
+                    "Aggregation complete - the aggregation has timed out");
+            }
+
+            return true;
+        }
+        
+        return false;
+    }
+
+    private void traceOrDebug(boolean traceOn, Log trace, Log log, String msg) {
+        if (traceOn) {
+            trace.info(msg);
+        }
+        if (log.isDebugEnabled()) {
+            log.debug(msg);
+        }
     }
 
-    public long getTimeout() {
-        return timeout;
+    public long getTimeoutMillis() {
+        return timeoutMillis;
     }
 
-    public void setTimeout(long timeout) {
-        this.timeout = timeout;
+    public void setTimeoutMillis(long timeoutMillis) {
+        this.timeoutMillis = timeoutMillis;
     }
 
     public int getMinCount() {
@@ -168,15 +189,15 @@
         this.maxCount = maxCount;
     }
 
-    public String getCorelation() {
-        return corelation;
+    public String getCorrelation() {
+        return correlation;
     }
 
-    public void setCorelation(String corelation) {
-        this.corelation = corelation;
+    public void setCorrelation(String correlation) {
+        this.correlation = correlation;
     }
 
-    public List getMessages() {
+    public List<MessageContext> getMessages() {
         return messages;
     }
 
@@ -184,15 +205,19 @@
         this.messages = messages;
     }
 
-    public long getExpireTime() {
-        return expireTime;
+    public long getExpiryTimeMillis() {
+        return expiryTimeMillis;
     }
 
-    public void setExpireTime(long expireTime) {
-        this.expireTime = expireTime;
+    public void setExpiryTimeMillis(long expiryTimeMillis) {
+        this.expiryTimeMillis = expiryTimeMillis;
     }
 
     public void run() {
-        mediator.completeAggregate(this);
+        if (log.isDebugEnabled()) {
+            log.debug("Time : " + System.currentTimeMillis() + " and this aggregator expired at : " +
+                expiryTimeMillis);
+        }
+        aggregateMediator.completeAggregate(this);
     }
 }

Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java Sun Jan 13 07:24:37 2008
@@ -25,7 +25,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.MessageContext;
-import org.apache.synapse.SynapseException;
 import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.mediators.AbstractMediator;
 import org.apache.synapse.mediators.eip.EIPUtils;
@@ -36,95 +35,51 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Collections;
 
 /**
- * This mediator will aggregate the messages flowing in to this with the specified message types
- * and build a one message
+ * Aggregate a number of messages that are determined to be for a particular group, and combine
+ * them to form a single message which is then processed through the 'onComplete' sequence. Thus
+ * an aggregator acts like a filter, and may look at a correlation XPath expression to select
+ * messages for aggregation - or look at messageSequence number properties for aggregation or
+ * let any other (i.e. non aggregatable) messages flow through
+ * An instance of this mediator will register with a Timer to be notified after a specified timeout,
+ * so that aggregations that never would complete could be timed out and cleared from memory and
+ * any fault conditions handled
  */
 public class AggregateMediator extends AbstractMediator {
 
     private static final Log log = LogFactory.getLog(AggregateMediator.class);
-
     private static final Log trace = LogFactory.getLog(SynapseConstants.TRACE_LOGGER);
 
-    /**
-     * This will hold the maximum lifetime of an aggregate and if a particular aggregate does not
-     * completed before its life time it will be invalidated and taken off from the activeAggregates
-     * map and put in to the expiredAggregates map and the invalidate sequence will be called to
-     * mediate the messages in the expired aggregate if there are any
-     */
-    private long timeToInvalidate = 0;
-
-    /**
-     * Messages coming to the aggregator will be examined for the existance of a node described
-     * in this XPATH and if it contains the XPATH pick that, if not try to find the messageSequence
-     * property for the correlation and if not pass the message through
-     */
-    private AXIOMXPath corelateExpression = null;
-
-    /**
-     * This will be used in the complete condition to complete the aggregation after waiting a
-     * specified timeout and send the messages gathered in the aggregate after aggregation
-     * if there are any messages
-     */
-    private long completeTimeout = 0;
-
-    /**
-     * Minimum number of messages required to evaluate the complete condition to true unless the
-     * aggregate has timed out with the provided timeout if there is a one
-     */
+    /** The duration as a number of milliseconds for this aggregation to complete */
+    private long completionTimeoutMillis = 0;
+    /** The minimum number of messages required to complete aggregation */
     private int minMessagesToComplete = -1;
-
-    /**
-     * Maximum number of messages that can be contained in a particular aggregation
-     */
+    /** The maximum number of messages required to complete aggregation */
     private int maxMessagesToComplete = -1;
 
     /**
-     * This will hold the implementation of the aggregation algorithm and upon validating the
-     * complete condition getAggregatedMessage method of the aggregator will be called to get
-     * the aggregated message
-     */
-    private AXIOMXPath aggregationExpression = null;
-
-    /**
-     * Holds a String reference to the Named Sequence which will be called to mediate the invalid
-     * messages coming in to the aggregator
+     * XPath that specifies a correlation expression that can be used to combine messages. An
+     * example maybe //department@id="11"
      */
-    private String invalidMsgSequenceRef = null;
-
-    /**
-     * Sequence which will be called to mediate the invalid messages coming in to aggregator
-     */
-    private SequenceMediator invalidMsgSequence = null;
-
+    private AXIOMXPath correlateExpression = null;
     /**
-     * This will be used to destroy the aggregates which were kept in the expiredAggregates map
+     * An XPath expression that may specify a selected element to be aggregated from a group of
+     * messages to create the aggregated message
+     * e.g. //getQuote/return would pick up and aggregate the //getQuote/return elements from a
+     * bunch of matching messages into one aggregated message
      */
-    private long invlidateToDestroyTime = 0;
+    private AXIOMXPath aggregationExpression = null;
 
-    /**
-     * This holds the reference sequence name of the
-     */
+    /** This holds the reference sequence name of the */
     private String onCompleteSequenceRef = null;
-
-    /**
-     *
-     */
+    /** Inline sequence definition holder that holds the onComplete sequence */
     private SequenceMediator onCompleteSequence = null;
 
-    /**
-     * This will hold the map of active aggregates at any given time
-     */
-    private Map<String, Aggregate> activeAggregates = new HashMap<String, Aggregate>();
-
-    /**
-     * This will hold the expired aggregates at any given time, these will be cleaned by a timer
-     * task time to time in order to ensure uncontrolled growth
-     */
-    private Map expiredAggregates = new HashMap();
-
-    private boolean isTimerSet = false;
+    /** The active aggregates currently being processd */
+    private Map<String, Aggregate> activeAggregates =
+        Collections.synchronizedMap(new HashMap<String, Aggregate>());
 
     public AggregateMediator() {
         try {
@@ -141,16 +96,14 @@
     }
 
     /**
-     * This is the mediate method implementation of the AggregateMediator. And this will aggregate
-     * the messages going through this mediator according to the correlation criteria and the
-     * aggregation algorithm specified to it
+     * Aggregate messages flowing through this mediator according to the correlation criteria
+     * and the aggregation algorithm specified to it
      *
      * @param synCtx - MessageContext to be mediated and aggregated
      * @return boolean true if the complete condition for the particular aggregate is validated
-     *         false if not
      */
     public boolean mediate(MessageContext synCtx) {
-        // tracing and debuggin related mediation initiation
+
         boolean traceOn = isTraceOn(synCtx);
         boolean traceOrDebugOn = isTraceOrDebugOn(traceOn);
 
@@ -162,114 +115,110 @@
             }
         }
 
-//        todo: revisit this
-//        if (!isTimerSet) {
-//            synCtx.getConfiguration().getSynapseTimer()
-//                    .schedule(new AggregateCollector(this), 5000);
-//        }
-
         try {
             Aggregate aggregate = null;
 
-            // if the corelate aggregationExpression is provided and there is a coresponding
-            // element in the message corelate the messages on that
-            if (this.corelateExpression != null
-                    && this.corelateExpression.evaluate(synCtx.getEnvelope()) != null) {
-
-                if (activeAggregates.containsKey(this.corelateExpression.toString())) {
-                    Object o = activeAggregates.get(this.corelateExpression.toString());
-                    if (o instanceof Aggregate) {
-                        aggregate = (Aggregate) o;
-                    } else {
-                        handleException("Undefined aggregate type.", synCtx);
-                    }
-                } else {
-                    aggregate = new Aggregate(this.corelateExpression.toString(),
-                            this.completeTimeout, this.minMessagesToComplete,
-                            this.maxMessagesToComplete, this);
-                    synCtx.getConfiguration().getSynapseTimer().schedule(aggregate, completeTimeout);
-                    activeAggregates.put(this.corelateExpression.toString(), aggregate);
-                }
-
-            // if the corelattion can not be found using the aggregationExpression try to find the
-            // corelation on the default criteria which is through the aggregate corelation
-            // property of the message
-            } else if (synCtx.getProperty(EIPConstants.AGGREGATE_CORELATION) != null) {
-
-                String corelation = synCtx.getProperty(
-                    EIPConstants.AGGREGATE_CORELATION) instanceof String ? synCtx.getProperty(
-                    EIPConstants.AGGREGATE_CORELATION).toString() : null;
+            // if a correlateExpression is provided and there is a coresponding
+            // element in the current message prepare to correlate the messages on that
+            if (correlateExpression != null
+                && correlateExpression.evaluate(synCtx.getEnvelope()) != null) {
 
-                // check whether the message corelation name is in the expired aggregates
-                if (expiredAggregates.containsKey(corelation)) {
+                if (activeAggregates.containsKey(correlateExpression.toString())) {
+                    aggregate = activeAggregates.get(correlateExpression.toString());
 
+                } else {
                     if (traceOrDebugOn) {
-                        traceOrDebug(traceOn, "Message with the corelation "
-                                + corelation + " expired. Invalidating the message.");
+                        traceOrDebug(traceOn, "Creating new Aggregator - expires in : " +
+                            (completionTimeoutMillis / 1000) + "secs");
                     }
 
-                    invalidate(synCtx, traceOrDebugOn, traceOn);
-                    return false;
+                    aggregate = new Aggregate(
+                        correlateExpression.toString(),
+                        completionTimeoutMillis,
+                        minMessagesToComplete,
+                        maxMessagesToComplete, this);
+                    synCtx.getConfiguration().getSynapseTimer().
+                        schedule(aggregate, completionTimeoutMillis);
+                    activeAggregates.put(correlateExpression.toString(), aggregate);
                 }
 
-                if (corelation != null) {
+            } else if (synCtx.getProperty(EIPConstants.AGGREGATE_CORRELATION) != null) {
+                // if the correlattion cannot be found using the correlateExpression then
+                // try the default which is through the AGGREGATE_CORRELATION message property
+                // which is the unique original message id of a split or iterate operation and
+                // which thus can be used to uniquely group messages into aggregates
 
-                    if (activeAggregates.containsKey(corelation)) {
+                Object o = synCtx.getProperty(EIPConstants.AGGREGATE_CORRELATION);
+                String correlation = null;
 
-                        Object o = activeAggregates.get(corelation);
-                        if (o instanceof Aggregate) {
-                            aggregate = (Aggregate) o;
-                        } else {
-                            handleException("Undefined aggregate type.", synCtx);
-                        }
+                if (o != null && o instanceof String) {
+                    correlation = (String) o;
+
+                    if (activeAggregates.containsKey(correlation)) {
+                        aggregate = activeAggregates.get(correlation);
 
                     } else {
-                        aggregate = new Aggregate(corelation, this.completeTimeout,
-                                this.minMessagesToComplete, this.maxMessagesToComplete, this);
-                        synCtx.getConfiguration().getSynapseTimer().schedule(aggregate, completeTimeout);                             
-                        activeAggregates.put(corelation, aggregate);
+                        if (traceOrDebugOn) {
+                            traceOrDebug(traceOn, "Creating new Aggregator - expires in : " +
+                                (completionTimeoutMillis / 1000) + "secs");
+                        }
+                        
+                        aggregate = new Aggregate(
+                            correlation,
+                            completionTimeoutMillis,
+                            minMessagesToComplete,
+                            maxMessagesToComplete, this);
+                        synCtx.getConfiguration().getSynapseTimer().
+                            schedule(aggregate, completionTimeoutMillis);
+                        activeAggregates.put(correlation, aggregate);
                     }
 
                 } else {
                     if (traceOrDebugOn) {
-                        traceOrDebug(traceOn,
-                            "Error in getting corelation details. Skip the aggregator.");
+                        traceOrDebug(traceOn, "Unable to find aggrgation correlation property");
                     }
                     return true;
                 }
             } else {
                 if (traceOrDebugOn) {
-                    traceOrDebug(traceOn,
-                        "Unable to find the aggregation corelation. Skip the aggregation");
+                    traceOrDebug(traceOn, "Unable to find aggrgation correlation XPath or property");
                 }
                 return true;
             }
 
             // if there is an aggregate continue on aggregation
             if (aggregate != null) {
-
-                // add the message to the aggregate and if the maximum count of the aggregate is
-                // exceeded invalidate the message
-                if (!aggregate.addMessage(synCtx)) {
-                    if (traceOrDebugOn) {
-                        traceOrDebug(traceOn, "Can not exceed aggregate " +
-                                "max message count. Invalidating message");
+                boolean collected = aggregate.addMessage(synCtx);
+                if (traceOrDebugOn) {
+                    if (collected) {
+                        traceOrDebug(traceOn, "Collected a message during aggregation");
+                        if (traceOn && trace.isTraceEnabled()) {
+                            trace.trace("Collected message : " + synCtx);
+                        }
                     }
-                    invalidate(synCtx, traceOrDebugOn, traceOn);
-                    return false;
                 }
-
-                // check the completeness of the aggregate and is completed aggregate the messages
+                
+                // check the completeness of the aggregate and if completed aggregate the messages
                 // if not completed return false and block the message sequence till it completes
-                if (aggregate.isComplete()) {
-                    return completeAggregate(aggregate);
+
+                if (aggregate.isComplete(traceOn, traceOrDebugOn, trace, log)) {
+                    if (traceOrDebugOn) {
+                        traceOrDebug(traceOn, "Aggregation completed - invoking onComplete");
+                    }
+                    completeAggregate(aggregate);
+                    
+                    if (traceOrDebugOn) {
+                        traceOrDebug(traceOn, "End : Aggregate mediator");
+                    }
+                    return true;
                 }
 
-            // if the aggregation corelation can not be found then continue the message on the
-            // normal path by returning true
             } else {
+                // if the aggregation correlation cannot be found then continue the message on the
+                // normal path by returning true
+
                 if (traceOrDebugOn) {
-                    traceOrDebug(traceOn, "Unable to find the aggregate. Skip the aggregation");
+                    traceOrDebug(traceOn, "Unable to find an aggregate for this message - skip");
                 }
                 return true;
             }
@@ -278,7 +227,6 @@
             handleException("Unable to execute the XPATH over the message", e, synCtx);
         }
 
-        // finalize tracing and debugging
         if (traceOrDebugOn) {
             traceOrDebug(traceOn, "End : Aggregate mediator");
         }
@@ -286,115 +234,104 @@
         return false;
     }
 
-    private void invalidate(MessageContext synCtx, boolean traceOrDebugOn, boolean traceOn) {
+    /**
+     * Invoked by the Aggregate objects that are timed out, to signal timeout/completion of
+     * itself
+     * @param aggregate the timed out Aggregate that holds collected messages and properties
+     */
+    public void completeAggregate(Aggregate aggregate) {
 
-        if (this.invalidMsgSequenceRef != null && synCtx.getConfiguration()
-                .getSequence(invalidMsgSequenceRef) != null) {
+        if (log.isDebugEnabled()) {
+            log.debug("Aggregation completed or timed out");
+        }
 
-            // use the sequence reference to get the sequence for mediation
-            synCtx.getConfiguration().getSequence(invalidMsgSequenceRef).mediate(synCtx);
+        // cancel the timer
+        aggregate.cancel();
 
-        } else if (this.invalidMsgSequence != null) {
+        MessageContext newSynCtx = getAggregatedMessage(aggregate);
+        if (newSynCtx == null) {
+            log.warn("An aggregation of messages timed out with no aggregated messages", null);
+            return;
+        }
 
-            // use the sequence to mediate the invalidated messages
-            invalidMsgSequence.mediate(synCtx);
+        activeAggregates.remove(aggregate);
 
-        } else {
-            if (traceOrDebugOn) {
-                traceOrDebug(traceOn, "No invalid message sequence defined. Dropping the message");
+        if ((correlateExpression != null &&
+            !correlateExpression.toString().equals(aggregate.getCorrelation())) ||
+            correlateExpression == null) {
+
+            if (onCompleteSequence != null) {
+                onCompleteSequence.mediate(newSynCtx);
+
+            } else if (onCompleteSequenceRef != null
+                && newSynCtx.getSequence(onCompleteSequenceRef) != null) {
+                newSynCtx.getSequence(onCompleteSequenceRef).mediate(newSynCtx);
+
+            } else {
+                handleException("Unable to find the sequence for the mediation " +
+                    "of the aggregated message", newSynCtx);
             }
         }
     }
 
-    public boolean completeAggregate(Aggregate aggregate) {
+    /**
+     * Get the aggregated message from the specified Aggregate instance
+     *
+     * @param aggregate the Aggregate object that holds collected messages and properties of the
+     * aggregation
+     * @return the aggregated message context
+     */
+    private MessageContext getAggregatedMessage(Aggregate aggregate) {
+
+        MessageContext newCtx = null;
+        Iterator<MessageContext> itr = aggregate.getMessages().iterator();
 
-            MessageContext newSynCtx = getAggregatedMessage(aggregate);
-            activeAggregates.remove(aggregate.getCorelation());
+        while (itr.hasNext()) {
+            MessageContext synCtx = itr.next();
+            if (newCtx == null) {
+                newCtx = synCtx;
 
-            if ((this.corelateExpression != null && !this.corelateExpression
-                    .toString().equals(aggregate.getCorelation())) ||
-                this.corelateExpression == null) {
-
-//                aggregate.setExpireTime(
-//                    System.currentTimeMillis() + this.invlidateToDestroyTime);
-                expiredAggregates.put(aggregate.getCorelation(),
-                    new Long(System.currentTimeMillis() + this.invlidateToDestroyTime));
-
-                if (this.onCompleteSequence != null) {
-                    this.onCompleteSequence.mediate(newSynCtx);
-                } else if (this.onCompleteSequenceRef != null
-                        && newSynCtx.getSequence(this.onCompleteSequenceRef) != null) {
-                    newSynCtx.getSequence(this.onCompleteSequenceRef).mediate(newSynCtx);
-                } else {
-                    handleException("Unable to find the sequence for the mediation " +
-                            "of the aggregated message", newSynCtx);
+                if (log.isDebugEnabled()) {
+                    log.debug("Generating Aggregated message from : " + newCtx.getEnvelope());
                 }
-                return false;
+
             } else {
-                return true;
-            }
-    }
+                try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Merging message : " + synCtx.getEnvelope() + " using XPath : " +
+                            aggregationExpression);
+                    }
 
-    public MessageContext getAggregatedMessage(Aggregate aggregate) {
-        MessageContext newCtx = null;
-        Iterator itr = aggregate.getMessages().iterator();
-        while (itr.hasNext()) {
-            Object o = itr.next();
-            if (o instanceof MessageContext) {
-                MessageContext synCtx = (MessageContext) o;
-                if (newCtx == null) {
-                    newCtx = synCtx;
-                } else {
-                    try {
-                        EIPUtils.enrichEnvelope(
-                            newCtx.getEnvelope(), synCtx.getEnvelope(), this.aggregationExpression);
-                    } catch (JaxenException e) {
-                        handleException("Unable to get the aggreagated message", e, synCtx);
+                    EIPUtils.enrichEnvelope(
+                        newCtx.getEnvelope(), synCtx.getEnvelope(), aggregationExpression);
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("Merged result : " + newCtx.getEnvelope());    
                     }
+
+                } catch (JaxenException e) {
+                    handleException("Error merging aggregation results using XPath : " +
+                        aggregationExpression.toString(), e, synCtx);
                 }
             }
         }
         return newCtx;
     }
 
-    public AXIOMXPath getCorelateExpression() {
-        return corelateExpression;
-    }
-
-    public void setCorelateExpression(AXIOMXPath corelateExpression) {
-        this.corelateExpression = corelateExpression;
-    }
-
-    public String getInvalidMsgSequenceRef() {
-        return invalidMsgSequenceRef;
-    }
-
-    public void setInvalidMsgSequenceRef(String invalidMsgSequenceRef) {
-        this.invalidMsgSequenceRef = invalidMsgSequenceRef;
+    public AXIOMXPath getCorrelateExpression() {
+        return correlateExpression;
     }
 
-    public SequenceMediator getInvalidMsgSequence() {
-        return invalidMsgSequence;
+    public void setCorrelateExpression(AXIOMXPath correlateExpression) {
+        this.correlateExpression = correlateExpression;
     }
 
-    public void setInvalidMsgSequence(SequenceMediator invalidMsgSequence) {
-        this.invalidMsgSequence = invalidMsgSequence;
+    public long getCompletionTimeoutMillis() {
+        return completionTimeoutMillis;
     }
 
-    public long getTimeToInvalidate() {
-        return timeToInvalidate;
-    }
-
-    public void setTimeToInvalidate(long timeToInvalidate) {
-        this.timeToInvalidate = timeToInvalidate;
-    }
-
-    public long getCompleteTimeout() {
-        return completeTimeout;
-    }
-
-    public void setCompleteTimeout(long completeTimeout) {
-        this.completeTimeout = completeTimeout;
+    public void setCompletionTimeoutMillis(long completionTimeoutMillis) {
+        this.completionTimeoutMillis = completionTimeoutMillis;
     }
 
     public int getMinMessagesToComplete() {
@@ -421,14 +358,6 @@
         this.aggregationExpression = aggregationExpression;
     }
 
-    public long getInvlidateToDestroyTime() {
-        return invlidateToDestroyTime;
-    }
-
-    public void setInvlidateToDestroyTime(long invlidateToDestroyTime) {
-        this.invlidateToDestroyTime = invlidateToDestroyTime;
-    }
-
     public String getOnCompleteSequenceRef() {
         return onCompleteSequenceRef;
     }
@@ -443,10 +372,6 @@
 
     public void setOnCompleteSequence(SequenceMediator onCompleteSequence) {
         this.onCompleteSequence = onCompleteSequence;
-    }
-
-    public Map getExpiredAggregates() {
-        return expiredAggregates;
     }
 
     public Map getActiveAggregates() {

Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/CloneMediator.java Sun Jan 13 07:24:37 2008
@@ -20,9 +20,12 @@
 package org.apache.synapse.mediators.eip.splitter;
 
 import org.apache.synapse.MessageContext;
+import org.apache.synapse.ManagedLifecycle;
 import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.core.SynapseEnvironment;
 import org.apache.synapse.util.MessageHelper;
 import org.apache.synapse.mediators.AbstractMediator;
+import org.apache.synapse.mediators.base.SequenceMediator;
 import org.apache.synapse.mediators.eip.Target;
 import org.apache.synapse.mediators.eip.EIPConstants;
 import org.apache.axis2.AxisFault;
@@ -31,35 +34,34 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Iterator;
 
 /**
- * This mediator will clone the message in to different messages and mediated as specified in the
- * target elements.
+ * This mediator will clone the message into multiple messages and mediate as specified in the
+ * target elements. A target specifies or refers to a sequence or an endpoint, and optionally
+ * specifies an Action and/or To address to be set to the cloned message. The number of cloned
+ * messages created is the number of targets specified
  */
-public class CloneMediator extends AbstractMediator {
+public class CloneMediator extends AbstractMediator implements ManagedLifecycle {
 
     /**
-     * This variable specifies whether to continue the parent message (i.e. message which is
-     * subjected to cloning) or not
+     * Continue processing the parent message or not?
+     * (i.e. message which is subjected to cloning)
      */
     private boolean continueParent = false;
 
-    /**
-     * Holds the list of targets to which cloned copies of the message will be given for mediation
-     */
+    /** the list of targets to which cloned copies of the message will be given for mediation */
     private List<Target> targets = new ArrayList<Target>();
 
     /**
      * This will implement the mediate method of the Mediator interface and will provide the
-     * functionality of cloning message in to the specified targets and mediation
+     * functionality of cloning message into the specified targets and mediation
      *
      * @param synCtx - MessageContext which is subjected to the cloning
-     * @return boolean true if this needs to be further mediated (continueParent=true) false
-     *         otherwise
+     * @return boolean true if this needs to be further mediated (continueParent=true)
      */
     public boolean mediate(MessageContext synCtx) {
 
-        // tracing and debuggin related mediation initiation
         boolean traceOn = isTraceOn(synCtx);
         boolean traceOrDebugOn = isTraceOrDebugOn(traceOn);
 
@@ -73,23 +75,15 @@
 
         // get the targets list, clone the message for the number of targets and then
         // mediate the cloned messages using the targets
-        if (targets.size() != 0) {
-
-            for (int i = 0; i < targets.size(); i++) {
-                // clone message context for this target
-                MessageContext newContext = getClonedMessageContext(synCtx, i, targets.size());
-                Object o = targets.get(i);
-
-                if (o instanceof Target) {
-                    Target target = (Target) o;
-                    target.mediate(newContext);
-                }
+        Iterator<Target> iter = targets.iterator();
+        int i = 0;
+        while (iter.hasNext()) {
+            if (traceOrDebugOn) {
+                traceOrDebug(traceOn, "Submitting " + (i+1) + " of " + targets.size() +
+                    " messages for processing in parallel");
             }
-        }
 
-        // finalize tracing and debugging
-        if (traceOrDebugOn) {
-            traceOrDebug(traceOn, "End : Clone mediator");
+            iter.next().mediate(getClonedMessageContext(synCtx, i++, targets.size()));
         }
 
         // if the continuation of the parent message is stopped from here set the RESPONSE_WRITTEN
@@ -100,35 +94,40 @@
             opCtx.setProperty(Constants.RESPONSE_WRITTEN, "SKIP");
         }
 
+        // finalize tracing and debugging
+        if (traceOrDebugOn) {
+            traceOrDebug(traceOn, "End : Clone mediator");
+        }
+
         // if continue parent is true mediators after the clone will be called for the further
         // mediation of the message which is subjected for clonning (parent message)
         return continueParent;
     }
 
     /**
-     * This private method is used to clone the MC in to a new MC
+     * clone the provided message context as a new message, and mark as the messageSequence'th
+     * message context of a total of messageCount messages
      *
-     * @param synCtx          - MessageContext which is subjected to the clonning
-     * @param messageSequence - int clonning message number
-     * @param messageCount    - int complete count of cloned messages
-     * @return MessageContext which is cloned from the given parameters
+     * @param synCtx          - MessageContext which is subjected to the cloning
+     * @param messageSequence - the position of this message of the cloned set
+     * @param messageCount    - total of cloned copies
+     * @return MessageContext the cloned message context
      */
     private MessageContext getClonedMessageContext(MessageContext synCtx, int messageSequence,
-                                                   int messageCount) {
+        int messageCount) {
 
         MessageContext newCtx = null;
         try {
-            // clones the message context
             newCtx = MessageHelper.cloneMessageContext(synCtx);
+
+            // set the property MESSAGE_SEQUENCE to the MC for aggregation purposes
+            newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE,
+                String.valueOf(messageSequence) + EIPConstants.MESSAGE_SEQUENCE_DELEMITER +
+                messageCount);            
         } catch (AxisFault axisFault) {
-            handleException("Error creating a new message context", axisFault, synCtx);
+            handleException("Error cloning the message context", axisFault, synCtx);
         }
 
-        // Sets the property MESSAGE_SEQUENCE to the MC for aggragation purposes 
-        assert newCtx != null;
-        newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE, String.valueOf(messageSequence)
-            + EIPConstants.MESSAGE_SEQUENCE_DELEMITER + messageCount);
-
         return newCtx;
     }
 
@@ -154,6 +153,26 @@
 
     public void addTarget(Target target) {
         this.targets.add(target);
+    }
+
+    public void init(SynapseEnvironment se) {
+        Iterator<Target> iter = targets.iterator();
+        while (iter.hasNext()) {
+            SequenceMediator seq = iter.next().getSequence();
+            if (seq != null) {
+                seq.init(se);
+            }
+        }
+    }
+
+    public void destroy() {
+        Iterator<Target> iter = targets.iterator();
+        while (iter.hasNext()) {
+            SequenceMediator seq = iter.next().getSequence();
+            if (seq != null) {
+                seq.destroy();
+            }
+        }
     }
 
 }

Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/splitter/IterateMediator.java Sun Jan 13 07:24:37 2008
@@ -41,49 +41,42 @@
 import java.util.Iterator;
 
 /**
- * This mediator will split the message in the criterian specified to it and inject in to Synapse
+ * Splits a message using an XPath expression and creates a new message to hold
+ * each resulting element. This is very much similar to the clone mediator, and
+ * hands over the newly created messages to a target for processing
  */
 public class IterateMediator extends AbstractMediator implements ManagedLifecycle {
 
-    /**
-     * This holds whether to continue mediation on the parent message or not
-     */
+    /** Continue mediation on the parent message or not? */
     private boolean continueParent = false;
 
     /**
-     * This holds whether to preserve the payload and attach the iteration child to specified node
-     * or to attach the child to the body of the envelope
+     * Preserve the payload as a template to create new messages with the selected
+     * elements with the rest of the parent, or create new message that contain only
+     * the selected element as its payload?
      */
     private boolean preservePayload = false;
 
-    /**
-     * This holds the expression which will be evaluated for the presence of elements in the
-     * mediating message for iterations
-     */
+    /** The XPath that will list the elements to be splitted */
     private AXIOMXPath expression = null;
 
     /**
-     * This holds the node to which the iteration childs will be attached. This does not have any
-     * meaning when the preservePayload is set to false
+     * An XPath expression that specifies where the splitted elements should be attached when
+     * the payload is being preserved
      */
     private AXIOMXPath attachPath = null;
 
-    /**
-     * This holds the target object for the newly created messages by the iteration
-     */
+    /** The target for the newly splitted messages */
     private Target target = null;
 
     /**
-     * This method implemenents the Mediator interface and this mediator implements the message
-     * splitting logic
+     * Splits the message by iterating over the results of the given XPath expression
      *
      * @param synCtx - MessageContext to be mediated
-     * @return boolean false if need to stop processing the parent message, boolean true if further
-     *         processing of the parent message is required
+     * @return boolean false if need to stop processing of the parent message
      */
     public boolean mediate(MessageContext synCtx) {
 
-        // initializes the logging and tracing for the mediator
         boolean traceOn = isTraceOn(synCtx);
         boolean traceOrDebugOn = isTraceOrDebugOn(traceOn);
 
@@ -104,48 +97,44 @@
             // get the iteration elements and iterate through the list,
             // this call will also detach all the iteration elements 
             List splitElements = EIPUtils.getDetachedMatchingElements(envelope, expression);
-            if (splitElements != null) {
 
-                int msgCount = splitElements.size();
-                int msgNumber = 0;
+            if (traceOrDebugOn) {
+                traceOrDebug(traceOn, "Splitting with XPath : " + expression + " resulted in " +
+                    splitElements.size() + " elements");
+            }
 
-                // if not preservePayload remove all the child elements
-                if (!preservePayload && envelope.getBody() != null) {
-                    for (Iterator itr = envelope.getBody().getChildren(); itr.hasNext();) {
-                        ((OMNode) itr.next()).detach();
-                    }
+            // if not preservePayload remove all the child elements
+            if (!preservePayload && envelope.getBody() != null) {
+                for (Iterator itr = envelope.getBody().getChildren(); itr.hasNext();) {
+                    ((OMNode) itr.next()).detach();
                 }
+            }
 
-                // iterate through the list
-                for (Object o : splitElements) {
+            int msgCount = splitElements.size();
+            int msgNumber = 0;
 
-                    // for the moment iterator will look for an OMNode as the iteration element
-                    if (!(o instanceof OMNode)) {
-                        handleException("Error in splitting the message with expression : "
-                            + expression, synCtx);
-                    }
-
-                    target.mediate(
-                        getIteratedMessage(synCtx, msgNumber, msgCount, envelope, (OMNode) o));
-                    msgNumber++;
+            // iterate through the list
+            for (Object o : splitElements) {
 
+                // for the moment iterator will look for an OMNode as the iteration element
+                if (!(o instanceof OMNode)) {
+                    handleException("Error splitting message with XPath : "
+                        + expression + " - result not an OMNode", synCtx);
                 }
 
-            } else {
-                handleException("Splitting by expression : " + expression
-                    + " did not yeild in an OMElement", synCtx);
+                if (traceOrDebugOn) {
+                    traceOrDebug(traceOn, "Submitting " + (msgNumber+1) + " of " + msgNumber +
+                        " messages for processing in parallel");
+                }
+
+                target.mediate(
+                    getIteratedMessage(synCtx, msgNumber++, msgCount, envelope, (OMNode) o));
             }
 
         } catch (JaxenException e) {
-            handleException("Error evaluating XPath expression : " + expression, e, synCtx);
-        } catch (AxisFault axisFault) {
-            handleException("Unable to split the message using the expression : " + expression,
-                axisFault, synCtx);
-        }
-
-        // finalizing the tracing and logging on the iterate mediator
-        if (traceOrDebugOn) {
-            traceOrDebug(traceOn, "End : Iterate mediator");
+            handleException("Error evaluating split XPath expression : " + expression, e, synCtx);
+        } catch (AxisFault af) {
+            handleException("Error creating an iterated copy of the message", af, synCtx);
         }
 
         // if the continuation of the parent message is stopped from here set the RESPONSE_WRITTEN
@@ -156,17 +145,22 @@
             opCtx.setProperty(Constants.RESPONSE_WRITTEN,"SKIP");
         }
 
+        if (traceOrDebugOn) {
+            traceOrDebug(traceOn, "End : Iterate mediator");
+        }
+
         // whether to continue mediation on the original message
         return continueParent;
     }
 
     /**
-     * This will create a new message context with the iteration parameters
+     * Create a new message context using the given original message context, the envelope
+     * and the split result element.
      *
      * @param synCtx    - original message context
      * @param msgNumber - message number in the iteration
-     * @param msgCount  - message count in the iteration
-     * @param envelope  - cloned envelope to be used in the iteration
+     * @param msgCount  - total number of messages in the split
+     * @param envelope  - envelope to be used in the iteration
      * @param o         - element which participates in the iteration replacement
      * @return newCtx created by the iteration
      * @throws AxisFault if there is a message creation failure
@@ -177,9 +171,12 @@
         
         // clone the message for the mediation in iteration
         MessageContext newCtx = MessageHelper.cloneMessageContext(synCtx);
+
         // set the messageSequence property for possibal aggreagtions
-        newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE,
+        newCtx.setProperty(
+            EIPConstants.MESSAGE_SEQUENCE,
             msgNumber + EIPConstants.MESSAGE_SEQUENCE_DELEMITER + msgCount);
+
         // get a clone of the envelope to be attached
         SOAPEnvelope newEnvelope = MessageHelper.cloneSOAPEnvelope(envelope);
 
@@ -188,24 +185,26 @@
         if (preservePayload) {
 
             Object attachElem = attachPath.evaluate(newEnvelope);
-            if (attachElem instanceof List) {
+            if (attachElem != null &&
+                attachElem instanceof List && !((List) attachElem).isEmpty()) {
                 attachElem = ((List) attachElem).get(0);
             }
 
             // for the moment attaching element should be an OMElement
-            if (attachElem instanceof OMElement) {
+            if (attachElem != null && attachElem instanceof OMElement) {
                 ((OMElement) attachElem).addChild(o);
             } else {
                 handleException("Error in attaching the splitted elements :: " +
                     "Unable to get the attach path specified by the expression " +
                     attachPath, synCtx);
             }
-            // if not preserve payload then attach the iteration element to the body
+
         } else if (newEnvelope.getBody() != null) {
+            // if not preserve payload then attach the iteration element to the body
             newEnvelope.getBody().addChild(o);
         }
 
-        // set the envelope ant mediate as specified in the target
+        // set the envelope and mediate as specified in the target
         newCtx.setEnvelope(newEnvelope);
 
         return newCtx;

Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/util/MessageHelper.java
URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/util/MessageHelper.java?rev=611582&r1=611581&r2=611582&view=diff
==============================================================================
--- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/util/MessageHelper.java (original)
+++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/util/MessageHelper.java Sun Jan 13 07:24:37 2008
@@ -15,7 +15,6 @@
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
-import org.wso2.throttle.ThrottleConstants;
 
 import java.util.Iterator;
 import java.util.ArrayList;
@@ -56,7 +55,7 @@
 
         // set the parent corelation details to the cloned MC -
         //                              for the use of aggregation like tasks
-        newCtx.setProperty(EIPConstants.AGGREGATE_CORELATION, synCtx.getMessageID());
+        newCtx.setProperty(EIPConstants.AGGREGATE_CORRELATION, synCtx.getMessageID());
 
         // copying the core parameters of the synapse MC
         newCtx.setTo(synCtx.getTo());



---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org