You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by in...@apache.org on 2007/10/11 03:58:35 UTC

svn commit: r583669 - in /webservices/synapse/trunk/java: ./ modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ modules/extensions/src/test/java/org/apache/syn...

Author: indika
Date: Wed Oct 10 18:58:34 2007
New Revision: 583669

URL: http://svn.apache.org/viewvc?rev=583669&view=rev
Log:
add XQuery Mediator test case 

improve throttle mediator for concurrency throttling ...
(need to fix response already committed error )

Added:
    webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/xquery/
    webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/xquery/XQueryMediatorSerializationTest.java
    webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/xquery/XQueryMediatorTest.java
Modified:
    webservices/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java
    webservices/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediatorFactory.java
    webservices/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediatorSerializer.java
    webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorSerializationTest.java
    webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorTest.java
    webservices/synapse/trunk/java/pom.xml
    webservices/synapse/trunk/java/repository/conf/sample/synapse_sample_600.xml
    webservices/synapse/trunk/java/repository/conf/sample/synapse_sample_601.xml

Modified: webservices/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java?rev=583669&r1=583668&r2=583669&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java (original)
+++ webservices/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java Wed Oct 10 18:58:34 2007
@@ -19,10 +19,7 @@
 package org.apache.synapse.mediators.throttle;
 
 import org.apache.axiom.om.OMElement;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.neethi.PolicyEngine;
-import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.Mediator;
 import org.apache.synapse.MessageContext;
 import org.apache.synapse.SynapseException;
@@ -32,6 +29,10 @@
 import org.wso2.throttle.*;
 import org.wso2.throttle.factory.AccessControllerFactory;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 
 /**
  * The Mediator for the throttling - Throtting will occur according to the ws-policy which is specified as
@@ -41,29 +42,62 @@
 
 public class ThrottleMediator extends AbstractMediator {
 
+    /** static map to share all concurrent access controllers */
+    public final static Map CONCURRENT_ACCESS_CONTROLLERS = Collections.synchronizedMap(new HashMap());
+
     /** The key for getting policy value - key refer to registry entry  */
     private String policyKey = null;
+
     /** InLine policy object - XML   */
     private OMElement inLinePolicy = null;
+
     /** The throttle - hold runtime + configuration data of throttle  */
     private Throttle throttle = null;
+
     /** The reference to the sequence which will execute when access deny*/
     private String onReject = null;
+
     /** The in-line sequence which will execute when access deny*/
     private Mediator onRejectMediator = null;
+
     /** The reference to the sequence which will execute when access accept */
     private String onAccept  = null;
+
     /** The in-line sequence which will execute when access accept */
     private Mediator onAcceptMediator = null;
 
+    /** The concurrect access control group id */
+    private String ID ;
+
+    /** is this initiator of the concurrent access controller*/
+    private boolean isInitiator = false;
+
+    /** Lock used to ensure thread-safe creation and use of the above Transformer  */
+    private final Object throttleLock = new Object();
+
+    /** The ConcurrentAccessController cache */
+    private ConcurrentAccessController concurrentAccessController;
+
+    /* check to debug log level whether currently enable or not */
+    private boolean debugOn;
+
+    public ThrottleMediator() {
+        this.debugOn = log.isDebugEnabled();
+    }
+
     public boolean mediate(MessageContext synCtx) {
         boolean shouldTrace = shouldTrace(synCtx.getTracingState());
         try {
             if (shouldTrace) {
                 trace.trace("Start : Throttle mediator");
             }
-            //init method to init throttle
-            init(synCtx, shouldTrace);
+            synchronized (throttleLock) {
+                //init method to init throttle
+                initThrottle(synCtx, shouldTrace);
+                if (concurrentAccessController == null && ID != null) {
+                    lookupConcurrentAccessController();
+                }
+            }
             // check access allow or not
             return canAccess(synCtx, shouldTrace);
         } finally {
@@ -77,30 +111,115 @@
      * To check whether allow access or not for caller
      * Current Implementaion only support IP Based Throttling
      *
-     * @param synContext
+     * @param synContext Current Message Context
+     * @param shouldTrace indicate whether trace is eanabled or not
      * @return boolean which indicate whether this caller can or not access
      */
     protected boolean canAccess(MessageContext synContext, boolean shouldTrace) {
+
+        boolean isResponse = synContext.isResponse();
+        // do the concurrent throttling
+        boolean canAccess = doConcurrentThrottling(isResponse, shouldTrace);
+        if (canAccess) { // if the access is success then
+            if (debugOn) {
+                log.debug("Access success from concurrent throttlling");
+            }
+            if (!isResponse) {
+                // do the normal throttling 
+                canAccess = doThrottling(synContext, shouldTrace);
+            }
+        } else {
+            if (debugOn) {
+                log.debug("Access deny from concurrent throttlling");
+            }
+        }
+
+        if (canAccess) {
+            if (onAccept != null) {
+                Mediator mediator = synContext.getSequence(onAccept);
+                if (mediator != null) {
+                    return mediator.mediate(synContext);
+                } else {
+                    return true;
+                }
+            } else if (onAcceptMediator != null) {
+                return onAcceptMediator.mediate(synContext);
+            } else {
+                return true;
+            }
+        } else {
+            if (onReject != null) {
+                Mediator mediator = synContext.getSequence(onReject);
+                if (mediator != null) {
+                    return mediator.mediate(synContext);
+                } else {
+                    return false;
+                }
+            } else if (onRejectMediator != null) {
+                return onRejectMediator.mediate(synContext);
+            } else {
+                return false;
+            }
+        }
+    }
+
+    /**
+     * Doing concurrency throttlling
+     * @param isResponse indicate whether message flow is OUT or IN
+     * @param shouldTrace indicate whether trace is ON or OFF
+     * @return True if message can continue ,otherwise false
+     */
+    private boolean doConcurrentThrottling(boolean isResponse, boolean shouldTrace) {
+
+        boolean canAccess = true;
+        if (concurrentAccessController != null) {
+            if (!isResponse) {
+                if (debugOn) {
+                    log.debug("Incoming message process through the ConcurrentThrottlling");
+                }
+                canAccess = concurrentAccessController.beforeAccess();
+                if (debugOn) {
+                    if (!canAccess) {
+                        log.debug("Access has currently been denied since allowed maximum concurrent access has exceeded");
+                    }
+                }
+            } else {
+                if (debugOn) {
+                    log.debug("Outcoming message process through the ConcurrentThrottlling");
+                }
+                canAccess = concurrentAccessController.afterAccess();
+            }
+        }
+        return canAccess;
+    }
+
+    /**
+     * Processing throughh IP based throttle
+     * @param synContext Current Message
+     * @param shouldTrace Indicates whether trace is ON or OFF
+     * @return  True if message can continue ,otherwise false
+     */
+    private boolean doThrottling(MessageContext synContext, boolean shouldTrace) {
+
         if (throttle == null) {
-            if (log.isDebugEnabled()) {
+            if (debugOn) {
                 log.debug("Can not find a throttle");
             }
             return true;
         }
-        boolean canAccess = true;
         org.apache.axis2.context.MessageContext axis2MessageContext
-                = ((Axis2MessageContext) synContext).getAxis2MessageContext();
+            = ((Axis2MessageContext) synContext).getAxis2MessageContext();
         //IP based throttling
         Object remoteIP = axis2MessageContext.getProperty(
-                org.apache.axis2.context.MessageContext.REMOTE_ADDR);
+            org.apache.axis2.context.MessageContext.REMOTE_ADDR);
         if (remoteIP == null) {
             if (shouldTrace) {
                 trace.trace("The IP Address of the caller is cannnot find- The Throttling will" +
-                        "not occur");
+                    "not occur");
             }
-            if (log.isDebugEnabled()) {
+            if (debugOn) {
                 log.debug("The IP address of the caller can not find - Currently only support caller-IP base"
-                        + "access control - Thottling will not happen ");
+                    + "access control - Thottling will not happen ");
             }
             return true;
         } else {
@@ -108,62 +227,40 @@
                 trace.trace("The IP Address of the caller :" + remoteIP);
             }
             ThrottleContext throttleContext
-                    = throttle.getThrottleContext(ThrottleConstants.IP_BASED_THROTTLE_KEY);
+                = throttle.getThrottleContext(ThrottleConstants.IP_BASED_THROTTLE_KEY);
             if (throttleContext == null) {
                 if (log.isDebugEnabled()) {
-                    log.debug("Can not find a configuartion for the IP Based Throttle");
+                    log.debug("Can not find a configuartion context for the IP Based Throttle");
                 }
                 return true;
             }
             try {
                 AccessController accessControler = AccessControllerFactory.createAccessControler(
-                        ThrottleConstants.IP_BASE);
-                canAccess = accessControler.canAccess(throttleContext, remoteIP);
+                    ThrottleConstants.IP_BASE);
+                boolean canAccess = accessControler.canAccess(throttleContext, remoteIP);
                 if (!canAccess) {
                     String msg = "Access has currently been denied by" +
-                            " the IP_BASE throttle for the IP :\t" + remoteIP;
+                        " the IP_BASE throttle for the IP :\t" + remoteIP;
                     if (shouldTrace) {
                         trace.trace(msg);
                     }
                     if (log.isDebugEnabled()) {
                         log.debug(msg);
                     }
+                } else {
+                    if(debugOn){
+                       log.debug("Access was successful ");
+                   }
                 }
+                return canAccess;
             }
             catch (ThrottleException e) {
                 handleException("Error occur during throttling ", e);
             }
         }
-        if (canAccess) {
-            if (onAccept != null) {
-                Mediator mediator = synContext.getSequence(onAccept);
-                if (mediator != null) {
-                    return mediator.mediate(synContext);
-                } else {
-                    return true;
-                }
-            } else if (onAcceptMediator != null) {
-                return onAcceptMediator.mediate(synContext);
-            } else {
-                return true;
-            }
-        } else {
-            if (onReject != null) {
-                Mediator mediator = synContext.getSequence(onReject);
-                if (mediator != null) {
-                    return mediator.mediate(synContext);
-                } else {
-                    return false;
-                }
-            } else if (onRejectMediator != null) {
-                return onRejectMediator.mediate(synContext);
-            } else {
-                return false;
-            }
-        }
+        return true;
     }
 
-
     /**
      * To init throttle with the policy
      * If the policy is defined as a Registry key ,then Policy will only process after it has expired
@@ -171,86 +268,143 @@
      * If the policy is defined as a Inline XML ,then only one time policy will process and any runtime
      * changes to the policy will not reflect
      *
-     * @param synContext
+     * @param synContext Current Message
+     * @param shouldTrace Indicates trace is ON or OFF
      */
-    protected void init(MessageContext synContext, boolean shouldTrace) {
+    protected void initThrottle(MessageContext synContext, boolean shouldTrace) {
 
         boolean reCreate = false; // It is not need to recreate ,if property is not dyanamic
         OMElement policyOmElement = null;
+
         if (policyKey != null) {
+
             Entry entry = synContext.getConfiguration().getEntryDefinition(policyKey);
             if (entry == null) {
-                if (log.isDebugEnabled()) {
+                if (debugOn) {
                     log.debug("Cant not find a Entry from the Entry key " + policyKey);
                 }
                 return;
             }
+
             Object entryValue = entry.getValue();
             if (entryValue == null) {
-                if (log.isDebugEnabled()) {
+                if (debugOn) {
                     log.debug("Cant not find a Policy(Enrty value) from the Entry key " + policyKey);
                 }
                 return;
             }
+
             if (!(entryValue instanceof OMElement)) {
-                if (log.isDebugEnabled()) {
+                if (debugOn) {
                     log.debug("Entry value which is refered from the key " + policyKey + " is Incompatible " +
-                            "for the policy element");
+                        "for the policy element");
                 }
                 return;
             }
+
             // if entry is dynamic, need to check wheather updated or not
             if ((!entry.isCached() || entry.isExpired())) {
                 reCreate = true;
             }
             policyOmElement = (OMElement) entryValue;
+
         } else if (inLinePolicy != null) {
             policyOmElement = inLinePolicy;
         }
+
         if (policyOmElement == null) {
-            if (log.isDebugEnabled()) {
-                log.debug("Cant not find a Policy - Throttling will not occur");
+            if (debugOn) {
+                log.debug("Can not find a Policy - Throttling will not occur");
             }
             return;
         }
+
         if (shouldTrace) {
             trace.trace("The Throttle Policy :" + policyOmElement.toString());
         }
         if (!reCreate) {
             //The first time creation
             if (throttle == null) {
-                createThrottleMetaData(policyOmElement);
+                createThrottleMetaData(policyOmElement, synContext.isResponse());
             }
         } else {
-            createThrottleMetaData(policyOmElement);
+            createThrottleMetaData(policyOmElement, synContext.isResponse());
         }
-
     }
 
     /**
      * To create the Throttle from the policy element
      *
      * @param policyOmElement - valid throttle policy
+     * @param isResponse - Indicates whether current message flow is IN or OUT
      */
-    protected void createThrottleMetaData(OMElement policyOmElement) {
+    protected void createThrottleMetaData(OMElement policyOmElement, boolean isResponse) {
         try {
-            if (log.isDebugEnabled()) {
+            if (debugOn) {
                 log.debug("Creating a new throttle configuration by parsing the Policy");
             }
             throttle = ThrottlePolicyProcessor
-                    .processPoclicy(PolicyEngine.getPolicy(policyOmElement));
+                .processPolicy(PolicyEngine.getPolicy(policyOmElement));
+
+            //set the concurrent access controller
+            if (ID != null) {
+                if (!CONCURRENT_ACCESS_CONTROLLERS.containsKey(ID)) {
+                    reCreateConcurrentAccessController(isResponse);
+                } else {
+                    if (isInitiator) {
+                        reCreateConcurrentAccessController(isResponse);
+                    } else {
+                        lookupConcurrentAccessController();
+                    }
+                }
+            }
         }
         catch (ThrottleException e) {
             handleException("Error during processing the thorttle policy  " + e.getMessage());
         }
     }
 
+    /**
+     * create a ConcurrentAccessController if the current message is incoming message
+     *
+     * @param isResponse true if the current message flow is out
+     */
+    private void reCreateConcurrentAccessController(boolean isResponse) {
+
+        if (!isResponse) {
+            concurrentAccessController = throttle.getConcurrentAccessController();
+            if (concurrentAccessController != null) {
+                isInitiator = true;  // frist time creation of concurrent access controller
+                if (CONCURRENT_ACCESS_CONTROLLERS.containsKey(ID)) {
+                    if (debugOn) {
+                        log.debug("Removing the ConcurrentAccessControler with Id " + ID);
+                    }
+                    CONCURRENT_ACCESS_CONTROLLERS.remove(ID);  // removing the old access controller
+                }
+                if (debugOn) {
+                    log.debug("Initiating ConcurrentAccessControler for throttle group id " + ID);
+                }
+                CONCURRENT_ACCESS_CONTROLLERS.put(ID, concurrentAccessController);
+            }
+        }
+    }
+
+    /**
+     * Looking up the ConcurrentAccessController which has initiated by a another throttle mediator
+     */
+    private void lookupConcurrentAccessController() {
+        log.info("ConcurrentAccessController has already defined for id :" + ID);
+        concurrentAccessController =
+            (ConcurrentAccessController) CONCURRENT_ACCESS_CONTROLLERS.get(ID);
+
+    }
+
     private void handleException(String msg) {
         log.error(msg);
         throw new SynapseException(msg);
     }
 
-    private void handleException(String msg, Exception e) {         
+    private void handleException(String msg, Exception e) {
         log.error(e);
         throw new SynapseException(msg);
     }
@@ -260,7 +414,7 @@
     }
 
     /**
-     * To get the policy key - The key for which lookup from the registry
+     * To get the policy key - The key for which will used to lookup policy from the registry
      *
      * @return String
      */
@@ -271,7 +425,7 @@
     /**
      * To set the policy key - The key for which lookup from the registry
      *
-     * @param policyKey
+     * @param policyKey Key for picking policy from the registry
      */
     public void setPolicyKey(String policyKey) {
         this.policyKey = policyKey;
@@ -289,7 +443,7 @@
     /**
      * setting throttle policy which has defined as InLineXML
      *
-     * @param inLinePolicy
+     * @param inLinePolicy Inline policy
      */
     public void setInLinePolicy(OMElement inLinePolicy) {
         this.inLinePolicy = inLinePolicy;
@@ -325,5 +479,13 @@
 
     public void setOnAcceptMediator(Mediator onAcceptMediator) {
         this.onAcceptMediator = onAcceptMediator;
+    }
+
+    public String getID() {
+        return ID;
+    }
+
+    public void setID(String ID) {
+        this.ID = ID;
     }
 }

Modified: webservices/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediatorFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediatorFactory.java?rev=583669&r1=583668&r2=583669&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediatorFactory.java (original)
+++ webservices/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediatorFactory.java Wed Oct 10 18:58:34 2007
@@ -20,12 +20,13 @@
 
 import org.apache.axiom.om.OMAttribute;
 import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.Mediator;
 import org.apache.synapse.config.xml.AbstractMediatorFactory;
-import org.apache.synapse.config.xml.XMLConfigConstants;
 import org.apache.synapse.config.xml.SequenceMediatorFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.config.xml.XMLConfigConstants;
+
 import javax.xml.namespace.QName;
 
 
@@ -63,12 +64,16 @@
                     throttleMediator.setInLinePolicy(inLine);
                 }
             }
-        } else {
-            handleException("Throttle Mediator must have a policy");
-        }
+        } 
         // after successfully creating the mediator
         // set its common attributes such as tracing etc
         processTraceState(throttleMediator,elem);
+
+        String id = elem.getAttributeValue(new QName(XMLConfigConstants.NULL_NAMESPACE, "id"));
+        if (id != null) {
+            throttleMediator.setID(id.trim());
+        }
+
         SequenceMediatorFactory mediatorFactory = new SequenceMediatorFactory();
         OMAttribute onReject = elem.getAttribute(
                 new QName(XMLConfigConstants.NULL_NAMESPACE, XMLConfigConstants.ONREJECT));

Modified: webservices/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediatorSerializer.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediatorSerializer.java?rev=583669&r1=583668&r2=583669&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediatorSerializer.java (original)
+++ webservices/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediatorSerializer.java Wed Oct 10 18:58:34 2007
@@ -24,10 +24,10 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.Mediator;
-import org.apache.synapse.mediators.base.SequenceMediator;
 import org.apache.synapse.config.xml.AbstractMediatorSerializer;
-import org.apache.synapse.config.xml.XMLConfigConstants;
 import org.apache.synapse.config.xml.SequenceMediatorSerializer;
+import org.apache.synapse.config.xml.XMLConfigConstants;
+import org.apache.synapse.mediators.base.SequenceMediator;
 
 /**
  * The Serializer for Throttle Mediator  saving throttle instance
@@ -60,6 +60,13 @@
             }
         }
         saveTracingState(throttle, throttleMediator);
+
+        String id = throttleMediator.getID();
+        if(id != null){
+            throttle.addAttribute(fac.createOMAttribute(
+                    "id", nullNS, id));
+        }
+        
         String onReject = throttleMediator.getOnReject();
         if (onReject != null) {
             throttle.addAttribute(fac.createOMAttribute(XMLConfigConstants.ONREJECT, nullNS, onReject));

Modified: webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorSerializationTest.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorSerializationTest.java?rev=583669&r1=583668&r2=583669&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorSerializationTest.java (original)
+++ webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorSerializationTest.java Wed Oct 10 18:58:34 2007
@@ -37,7 +37,7 @@
     }
 
     public void testThrottleMediatorSerializationSenarioOne() throws Exception {
-        String inputXml = "<throttle:throttle xmlns:throttle=\"http://ws.apache.org/ns/synapse/throttle\" xmlns=\"http://ws.apache.org/ns/synapse\" >" +
+        String inputXml = "<throttle:throttle id=\"A\" xmlns:throttle=\"http://ws.apache.org/ns/synapse/throttle\" xmlns=\"http://ws.apache.org/ns/synapse\" >" +
                 "<policy key=\"thottleKey\"/></throttle:throttle>";
         assertTrue(serialization(inputXml, throttleMediatorFactory, throttleMediatorSerializer));
         assertTrue(serialization(inputXml, throttleMediatorSerializer));

Modified: webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorTest.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorTest.java?rev=583669&r1=583668&r2=583669&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorTest.java (original)
+++ webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorTest.java Wed Oct 10 18:58:34 2007
@@ -54,6 +54,7 @@
     private static final String POLICY = " <wsp:Policy xmlns:wsp=\"http://schemas.xmlsoap.org/ws/2004/09/policy\"\n" +
             "                xmlns:throttle=\"http://www.wso2.org/products/wso2commons/throttle\">\n" +
             "        <throttle:ThrottleAssertion>\n" +
+            "            <throttle:MaximumConcurrentAccess>10</throttle:MaximumConcurrentAccess>\n" +           
             "            <wsp:All>\n" +
             "                <throttle:ID throttle:type=\"IP\">Other</throttle:ID>\n" +
             "                <wsp:ExactlyOne>\n" +
@@ -297,7 +298,7 @@
 
             try {
                 throttle = ThrottlePolicyProcessor
-                        .processPoclicy(PolicyEngine.getPolicy(policyOmElement));
+                        .processPolicy(PolicyEngine.getPolicy(policyOmElement));
             }
             catch (ThrottleException e) {
 

Added: webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/xquery/XQueryMediatorSerializationTest.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/xquery/XQueryMediatorSerializationTest.java?rev=583669&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/xquery/XQueryMediatorSerializationTest.java (added)
+++ webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/xquery/XQueryMediatorSerializationTest.java Wed Oct 10 18:58:34 2007
@@ -0,0 +1,57 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.mediators.xquery;
+
+import org.apache.synapse.mediators.AbstractTestCase;
+
+/**
+ *
+ */
+
+public class XQueryMediatorSerializationTest extends AbstractTestCase {
+    private XQueryMediatorFactory factory;
+    private XQueryMediatorSerializer serializer;
+
+    public XQueryMediatorSerializationTest() {
+        factory = new XQueryMediatorFactory();
+        serializer = new XQueryMediatorSerializer();
+    }
+
+    public void testXQueryMediatorSerializationSenarioOne() throws Exception {
+        String inputXml = "<xquery:xquery xmlns:xquery=\"http://ws.apache.org/ns/synapse/xquery\" xmlns=\"http://ws.apache.org/ns/synapse\" key=\"querykey\" target=\"target\">" +
+                          "<dataSource>" +
+                          "<property name=\"username\" value=\"valueone\" />" +
+                          "</dataSource>" +
+                          "<variable name=\"b1\" value=\"23\" type=\"INT\" />" +
+                          "<variable name=\"b1\" value=\"true\" type=\"BOOLEAN\" />" +
+                          "<variable name=\"b1\" value=\"23.44\" type=\"DOUBLE\" />" +
+                          "<variable name=\"b1\" value=\"23\" type=\"LONG\" />" +
+                          "<variable name=\"b1\" value=\"23.1\" type=\"FLOAT\" />" +
+                          "<variable name=\"b1\" value=\"23\" type=\"SHORT\" />" +
+                          "<variable name=\"b1\" value=\"23\" type=\"BYTE\" />" +
+                          "<variable name=\"b1\" value=\"synapse\" type=\"STRING\" />" +
+                          "<variable name=\"b1\" key=\"xmlkey\" type=\"DOCUMENT\" />" +
+                          "<variable name=\"b1\" key=\"xmlkey\" type=\"DOCUMENT_ELEMENT\" />" +
+                          "<variable name=\"b1\" key=\"xmlkey\" type=\"ELEMENT\" />" +
+                          "</xquery:xquery>";
+        assertTrue(serialization(inputXml, factory, serializer));
+
+    }
+
+}

Added: webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/xquery/XQueryMediatorTest.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/xquery/XQueryMediatorTest.java?rev=583669&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/xquery/XQueryMediatorTest.java (added)
+++ webservices/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/xquery/XQueryMediatorTest.java Wed Oct 10 18:58:34 2007
@@ -0,0 +1,269 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.mediators.xquery;
+
+import junit.framework.TestCase;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.mediators.TestUtils;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import java.util.List;
+import java.util.ArrayList;
+import java.io.StringReader;
+
+import net.sf.saxon.javax.xml.xquery.XQItemType;
+
+/**
+ *
+ */
+
+public class XQueryMediatorTest extends TestCase {
+
+    public final static String sampleXml = "<bookstore><book category=\"COOKING\"> <title lang=\"en\">Everyday Italian</title>\n" +
+                                           "  <author>Giada De Laurentiis</author>\n" +
+                                           "  <year>2005</year>\n" +
+                                           "  <price>30.00</price>\n" +
+                                           "\n" +
+                                           "</book>\n" +
+                                           "\n" +
+                                           "<book category=\"CHILDREN\">\n" +
+                                           "  <title lang=\"en\">Harry Potter</title>\n" +
+                                           "  <author>J K. Rowling</author>\n" +
+                                           "  <year>2005</year>\n" +
+                                           "  <price>29.99</price>\n" +
+                                           "</book>\n" +
+                                           "\n" +
+                                           "<book category=\"WEB\">\n" +
+                                           "  <title lang=\"en\">XQuery Kick Start</title>\n" +
+                                           "  <author>James McGovern</author>\n" +
+                                           "  <author>Per Bothner</author>\n" +
+                                           "  <author>Kurt Cagle</author>\n" +
+                                           "  <author>James Linn</author>\n" +
+                                           "  <author>Vaidyanathan Nagarajan</author>\n" +
+                                           "\n" +
+                                           "  <year>2003</year>\n" +
+                                           "  <price>49.99</price>\n" +
+                                           "</book>\n" +
+                                           "\n" +
+                                           "<book category=\"WEB\">\n" +
+                                           "  <title lang=\"en\">Learning XML</title>\n" +
+                                           "  <author>Erik T. Ray</author>\n" +
+                                           "  <year>2003</year>\n" +
+                                           "\n" +
+                                           "  <price>39.95</price>\n" +
+                                           "</book>\n" +
+                                           "\n" +
+                                           "</bookstore>";
+
+    public final static String sampleXml2 = "<m0:CheckPriceRequest xmlns:m0=\"http://www.apache-synapse.org/test\">\n" +
+                                            "    <m0:Code>IBM</m0:Code>\n" +
+                                            "</m0:CheckPriceRequest>";
+    public final static String sampleXml3 = "<m0:return xmlns:m0=\"http://services.samples/xsd\">\n" +
+                                            "\t<m0:symbol>IBM</m0:symbol>\n" +
+                                            "\t<m0:last>122222</m0:last>\n" +
+                                            "</m0:return>";
+    public final static String externalXMl = "<commission>\n" +
+                                             "    <vendor symbol=\"IBM\">44444</vendor>\n" +
+                                             "    <vendor symbol=\"MSFT\">55555</vendor>\n" +
+                                             "    <vendor symbol=\"SUN\">66666</vendor>\n" +
+                                             "</commission>";
+
+    public void testQueryWithAll() throws Exception {
+        MessageContext mc = TestUtils.getTestContext("<foo/>", null);
+        XQueryMediator mediator = new XQueryMediator();
+        mediator.setQuerySource("declare variable $intVar as xs:int external;" +
+                             "declare variable $boolVar as xs:boolean external;" +
+                             "declare variable $byteVar as xs:byte external;" +
+                             "declare variable $longVar as xs:long external;" +
+                             "declare variable $doubleVar as xs:double external;" +
+                             "declare variable $shortVar as xs:short external;" +
+                             "declare variable $floatVar as xs:float external;" +
+                             "declare variable $stringVar as xs:string external;" +
+                             "document { " +
+                             "<a xmlns='http://a/uri' z:in='out' xmlns:z='http://z/uri'>" +
+                             "<b>{$intVar+2}<e>{$boolVar}</e>" +
+                             "<all>" +
+                             "{$byteVar}," +
+                             "{$shortVar}," +
+                             "{$doubleVar}," +
+                             "{$longVar}," +
+                             "{$floatVar}," +
+                             "{$stringVar}," +
+                             "</all></b></a> }");
+        List list = new ArrayList();
+        MediatorVariable intVariable = new MediatorBaseVariable(new QName("intVar"));
+        intVariable.setType(XQItemType.XQBASETYPE_INT);
+        intVariable.setValue(new Integer(8));
+        list.add(intVariable);
+        MediatorVariable boolVariable = new MediatorBaseVariable(new QName("boolVar"));
+        boolVariable.setType(XQItemType.XQBASETYPE_BOOLEAN);
+        boolVariable.setValue(Boolean.TRUE);
+        list.add(boolVariable);
+        MediatorVariable doubleVariable = new MediatorBaseVariable(new QName("doubleVar"));
+        doubleVariable.setType(XQItemType.XQBASETYPE_DOUBLE);
+        doubleVariable.setValue(new Double(23.33));
+        list.add(doubleVariable);
+        MediatorVariable floatVariable = new MediatorBaseVariable(new QName("floatVar"));
+        floatVariable.setType(XQItemType.XQBASETYPE_FLOAT);
+        floatVariable.setValue(new Float(23.33));
+        list.add(floatVariable);
+        MediatorVariable shortVariable = new MediatorBaseVariable(new QName("shortVar"));
+        shortVariable.setType(XQItemType.XQBASETYPE_SHORT);
+        shortVariable.setValue(new Short((short) 327));
+        list.add(shortVariable);
+        MediatorVariable byteVariable = new MediatorBaseVariable(new QName("byteVar"));
+        byteVariable.setType(XQItemType.XQBASETYPE_BYTE);
+        byteVariable.setValue(new Byte((byte) 3));
+        list.add(byteVariable);
+        MediatorVariable longVariable = new MediatorBaseVariable(new QName("longVar"));
+        longVariable.setType(XQItemType.XQBASETYPE_LONG);
+        longVariable.setValue(new Long(334));
+        list.add(longVariable);
+        MediatorVariable stringValue = new MediatorBaseVariable(new QName("stringVar"));
+        stringValue.setType(XQItemType.XQBASETYPE_STRING);
+        stringValue.setValue("synapse");
+        list.add(stringValue);
+        mediator.addAllVariables(list);
+        assertTrue(mediator.mediate(mc));
+        assertEquals("10", mc.getEnvelope().getBody().getFirstElement().
+                getFirstElement().getText());
+        assertEquals("true", mc.getEnvelope().getBody().getFirstElement().
+                getFirstElement().getFirstElement().getText());
+    }
+
+    public void testQueryWithPayload() throws Exception {
+        MessageContext mc = TestUtils.getTestContext(sampleXml, null);
+        XQueryMediator mediator = new XQueryMediator();
+        List list = new ArrayList();
+        MediatorVariable variable = new MediatorCustomVariable(new QName("payload"));
+        variable.setType(XQItemType.XQITEMKIND_DOCUMENT);
+        list.add(variable);
+        mediator.addAllVariables(list);
+        mediator.setQuerySource("declare variable $payload as document-node() external;" +
+                             "$payload//bookstore/book/title");
+        assertTrue(mediator.mediate(mc));
+        assertEquals("Everyday Italian", mc.getEnvelope().getBody().getFirstElement().getText());
+    }
+
+    public void testQueryWithPayloadTwo() throws Exception {
+        MessageContext mc = TestUtils.getTestContext(sampleXml2, null);
+        XQueryMediator mediator = new XQueryMediator();
+        List list = new ArrayList();
+        MediatorVariable variable = new MediatorCustomVariable(new QName("payload"));
+        variable.setType(XQItemType.XQITEMKIND_DOCUMENT);
+        list.add(variable);
+        mediator.addAllVariables(list);
+        mediator.setQuerySource("declare namespace m0=\"http://www.apache-synapse.org/test\"; " +
+                             "declare variable $payload as document-node() external;" +
+                             "<m:getQuote xmlns:m=\"http://services.samples/xsd\">\n" +
+                             "<m:request>" +
+                             "   <m:symbol>{$payload//m0:CheckPriceRequest/m0:Code/child::text()}" +
+                             "   </m:symbol><" +
+                             "/m:request>\n" +
+                             "</m:getQuote> ");
+        assertTrue(mediator.mediate(mc));
+
+        assertEquals("IBM", mc.getEnvelope().getBody().getFirstElement().
+                getFirstElement().getFirstElement().getText());
+    }
+
+    public void testQueryWithPayloadThree() throws Exception {
+        MessageContext mc = TestUtils.getTestContext(sampleXml3, null);
+        XQueryMediator mediator = new XQueryMediator();
+        List list = new ArrayList();
+        MediatorVariable variable = new MediatorCustomVariable(new QName("payload"));
+        variable.setType(XQItemType.XQITEMKIND_DOCUMENT);
+        list.add(variable);
+        mediator.addAllVariables(list);
+        mediator.setQuerySource("declare namespace m0=\"http://services.samples/xsd\";" +
+                             " declare variable $payload as document-node() external;\n" +
+                             "<m:CheckPriceResponse xmlns:m=\"http://www.apache-synapse.org/test\">\n" +
+                             "\t<m:Code>{$payload//m0:return/m0:symbol/child::text()}</m:Code>\n" +
+                             "\t<m:Price>{$payload//m0:return/m0:last/child::text()}</m:Price>\n" +
+                             "</m:CheckPriceResponse>");
+        assertTrue(mediator.mediate(mc));
+    }
+
+    public void testQueryWithPayloadFour() throws Exception {
+        MessageContext mc = TestUtils.getTestContext(sampleXml3, null);
+        XQueryMediator mediator = new XQueryMediator();
+        List list = new ArrayList();
+        MediatorVariable variable = new MediatorCustomVariable(new QName("payload"));
+        variable.setType(XQItemType.XQITEMKIND_DOCUMENT);
+        list.add(variable);
+        MediatorCustomVariable variableForXml = new MediatorCustomVariable(new QName("commission"));
+        variableForXml.setType(XQItemType.XQITEMKIND_DOCUMENT);
+        variableForXml.setRegKey("file:key");
+        variableForXml.setValue(createOMElement(externalXMl));
+        list.add(variableForXml);
+        mediator.addAllVariables(list);
+        mediator.setQuerySource(" declare namespace m0=\"http://services.samples/xsd\";\n" +
+                             " declare variable $payload as document-node() external;\n" +
+                             " declare variable $commission as document-node() external;\n" +
+                             " <m0:return xmlns:m0=\"http://services.samples/xsd\">\n" +
+                             "  \t<m0:symbol>{$payload//m0:return/m0:symbol/child::text()}" +
+                             "   </m0:symbol>\n" +
+                             "  \t<m0:last>{$payload//m0:return/m0:last/child::text()+ $commission//commission/vendor[@symbol=$payload//m0:return/m0:symbol/child::text()]}</m0:last>\n" +
+                             " </m0:return>");
+        assertTrue(mediator.mediate(mc));
+    }
+
+    public void testQueryReturnInt() throws Exception {
+        MessageContext mc = TestUtils.getTestContext("<foo/>", null);
+        XQueryMediator mediator = new XQueryMediator();
+        mediator.setQuerySource("for $n in 1 to 10 return $n*$n");
+        assertTrue(mediator.mediate(mc));
+        assertEquals("1", mc.getEnvelope().getBody().getFirstElement().getText());
+    }
+
+    public void testQueryReturnBoolean() throws Exception {
+        MessageContext mc = TestUtils.getTestContext("<foo/>", null);
+        XQueryMediator mediator = new XQueryMediator();
+        mediator.setQuerySource("declare variable $boolVar as xs:boolean external; $boolVar");
+        List list = new ArrayList();
+        MediatorVariable boolVariable = new MediatorBaseVariable(new QName("boolVar"));
+        boolVariable.setType(XQItemType.XQBASETYPE_BOOLEAN);
+        boolVariable.setValue(Boolean.TRUE);
+        list.add(boolVariable);
+        mediator.addAllVariables(list);
+        assertTrue(mediator.mediate(mc));
+        assertEquals("true", mc.getEnvelope().getBody().getFirstElement().getText());
+    }
+
+    protected OMElement createOMElement(String xml) {
+        try {
+
+            XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(
+                    new StringReader(xml));
+            StAXOMBuilder builder = new StAXOMBuilder(reader);
+            OMElement omElement = builder.getDocumentElement();
+            return omElement;
+
+        }
+        catch (XMLStreamException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

Modified: webservices/synapse/trunk/java/pom.xml
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/pom.xml?rev=583669&r1=583668&r2=583669&view=diff
==============================================================================
--- webservices/synapse/trunk/java/pom.xml (original)
+++ webservices/synapse/trunk/java/pom.xml Wed Oct 10 18:58:34 2007
@@ -666,7 +666,7 @@
         <dependency>
             <groupId>org.wso2.throttle</groupId>
             <artifactId>wso2throttle</artifactId>
-            <version>${wso2commons.version}</version>
+            <version>SNAPSHOT</version>
             <type>jar</type>
             <exclusions>
                 <exclusion>

Modified: webservices/synapse/trunk/java/repository/conf/sample/synapse_sample_600.xml
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/repository/conf/sample/synapse_sample_600.xml?rev=583669&r1=583668&r2=583669&view=diff
==============================================================================
--- webservices/synapse/trunk/java/repository/conf/sample/synapse_sample_600.xml (original)
+++ webservices/synapse/trunk/java/repository/conf/sample/synapse_sample_600.xml Wed Oct 10 18:58:34 2007
@@ -20,12 +20,13 @@
              xmlns:throttle="http://ws.apache.org/ns/synapse/throttle">
     <sequence name="main">
         <in>
-            <throttle:throttle>
+            <throttle:throttle id="A">
                 <policy>
                     <!-- define throttle policy -->
                     <wsp:Policy xmlns:wsp="http://schemas.xmlsoap.org/ws/2004/09/policy"
                                 xmlns:throttle="http://www.wso2.org/products/wso2commons/throttle">
                         <throttle:ThrottleAssertion>
+                            <throttle:MaximumConcurrentAccess>10</throttle:MaximumConcurrentAccess>
                             <wsp:All>
                                 <throttle:ID throttle:type="IP">Other</throttle:ID>
                                 <wsp:ExactlyOne>
@@ -93,13 +94,15 @@
                         <reason value="**Access Denied**"/>
                     </makefault>
                     <property name="RESPONSE" value="true"/>
-                    <header name="To" expression="get-property('ReplyTo')"/>
+                    <header name="To" action="remove"/>
+                    <throttle:throttle id="A"/>
                     <send/>
                     <drop/>
                 </onReject>
             </throttle:throttle>
         </in>
         <out>
+            <throttle:throttle id="A"/>
             <send/>
         </out>
     </sequence>

Modified: webservices/synapse/trunk/java/repository/conf/sample/synapse_sample_601.xml
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/repository/conf/sample/synapse_sample_601.xml?rev=583669&r1=583668&r2=583669&view=diff
==============================================================================
--- webservices/synapse/trunk/java/repository/conf/sample/synapse_sample_601.xml (original)
+++ webservices/synapse/trunk/java/repository/conf/sample/synapse_sample_601.xml Wed Oct 10 18:58:34 2007
@@ -18,6 +18,7 @@
   -->
 <definitions xmlns="http://ws.apache.org/ns/synapse"
              xmlns:throttle="http://ws.apache.org/ns/synapse/throttle">
+    
     <registry provider="org.apache.synapse.registry.url.SimpleURLRegistry">
         <!-- the root property of the simple URL registry helps resolve a resource URL as root + key -->
         <parameter name="root">file:./../../repository/</parameter>
@@ -29,16 +30,6 @@
     <localEntry key="thottlePolicy"
                 src="file:./repository/conf/sample/resources/policy/throttle_policy.xml"/>
 
-    <sequence name="main">
-        <in>
-            <throttle:throttle onReject="onRejectSequence" onAccept="onAcceptSequence">
-                <policy key="thottlePolicy"/>
-            </throttle:throttle>
-        </in>
-        <out>
-            <send/>
-        </out>
-    </sequence>
     <sequence name="onAcceptSequence">
         <log level="custom">
             <property name="text" value="**Access Accept**"/>
@@ -49,7 +40,7 @@
             </endpoint>
         </send>
     </sequence>
-    <sequence name="onRejectSequence">
+    <sequence name="onRejectSequence" trace="enable">
         <log level="custom">
             <property name="text" value="**Access Denied**"/>
         </log>
@@ -59,8 +50,23 @@
             <reason value="**Access Denied**"/>
         </makefault>
         <property name="RESPONSE" value="true"/>
-        <header name="To" expression="get-property('ReplyTo')"/>
+        <header name="To" action="remove"/>
+        <throttle:throttle id="A"/>
         <send/>
         <drop/>
     </sequence>
+    <proxy name="StockQuoteProxy">
+        <target>
+             <inSequence>
+                <throttle:throttle onReject="onRejectSequence" onAccept="onAcceptSequence" id="A">
+                    <policy key="thottlePolicy"/>
+                </throttle:throttle>
+            </inSequence>
+            <outSequence>
+                <throttle:throttle id="A"/>
+                <send/>
+            </outSequence>
+        </target>
+        <publishWSDL uri="file:repository/conf/sample/resources/proxy/sample_proxy_1.wsdl"/>
+    </proxy>
 </definitions>



---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org