You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by in...@apache.org on 2009/06/15 09:47:57 UTC
svn commit: r784678 - in /synapse/trunk/java:
modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java
modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorTest.java
pom.xml
Author: indika
Date: Mon Jun 15 07:47:57 2009
New Revision: 784678
URL: http://svn.apache.org/viewvc?rev=784678&view=rev
Log:
update to throttle version 3.0
Modified:
synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java
synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorTest.java
synapse/trunk/java/pom.xml
Modified: synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java?rev=784678&r1=784677&r2=784678&view=diff
==============================================================================
--- synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java (original)
+++ synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java Mon Jun 15 07:47:57 2009
@@ -19,30 +19,29 @@
package org.apache.synapse.mediators.throttle;
import org.apache.axiom.om.OMElement;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.ClusterManager;
+import org.apache.axis2.clustering.context.Replicator;
+import org.apache.axis2.context.ConfigurationContext;
import org.apache.neethi.PolicyEngine;
-import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
+import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.SynapseLog;
-import org.apache.synapse.transport.nhttp.NhttpConstants;
import org.apache.synapse.config.Entry;
-import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.mediators.AbstractMediator;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.clustering.context.Replicator;
-import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.ClusterManager;
+import org.apache.synapse.transport.nhttp.NhttpConstants;
import org.wso2.throttle.*;
-
/**
- * The Mediator for the throttling - Throtting will occur according to the ws-policy
+ * The Mediator for the throttling - Throttling will occur according to the ws-policy
* which is specified as the key for lookup from the registry or the inline policy
- * Only support IP based throttling- Throotling can manage per IP using the throttle policy
+ * Only support IP based throttling- Throttling can manage per IP using the throttle policy
*/
-public class ThrottleMediator extends AbstractMediator implements ManagedLifecycle {
+public class ThrottleMediator extends AbstractMediator {
/* The key for getting the throttling policy - key refers to a/an [registry] entry */
private String policyKey = null;
@@ -78,19 +77,19 @@
public void init(SynapseEnvironment se) {
if (onAcceptMediator instanceof ManagedLifecycle) {
- ((ManagedLifecycle)onAcceptMediator).init(se);
+ ((ManagedLifecycle) onAcceptMediator).init(se);
}
if (onRejectMediator instanceof ManagedLifecycle) {
- ((ManagedLifecycle)onRejectMediator).init(se);
+ ((ManagedLifecycle) onRejectMediator).init(se);
}
}
public void destroy() {
if (onAcceptMediator instanceof ManagedLifecycle) {
- ((ManagedLifecycle)onAcceptMediator).destroy();
+ ((ManagedLifecycle) onAcceptMediator).destroy();
}
if (onRejectMediator instanceof ManagedLifecycle) {
- ((ManagedLifecycle)onRejectMediator).destroy();
+ ((ManagedLifecycle) onRejectMediator).destroy();
}
}
@@ -108,7 +107,7 @@
synLog.traceTrace("Message : " + synCtx.getEnvelope());
}
}
- // To ensure the creation of throttle is thread safe â It is possible create same throttle
+ // To ensure the creation of throttle is thread safe âââ‰â¬Å It is possible create same throttle
// object multiple times by multiple threads.
synchronized (throttleLock) {
@@ -119,10 +118,10 @@
//To ensure check for clustering environment only happens one time
if ((throttle == null && !isResponse) || (isResponse
- && concurrentAccessController == null)) {
+ && concurrentAccessController == null)) {
ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager();
if (clusterManager != null &&
- clusterManager.getContextManager() != null) {
+ clusterManager.getContextManager() != null) {
isClusteringEnable = true;
}
}
@@ -133,7 +132,7 @@
//if this is a clustered environment
if (isClusteringEnable) {
concurrentAccessController =
- (ConcurrentAccessController) cc.getProperty(key);
+ (ConcurrentAccessController) cc.getProperty(key);
}
// for request messages, read the policy for throttling and initialize
if (inLinePolicy != null) {
@@ -142,12 +141,12 @@
if (synLog.isTraceTraceEnabled()) {
synLog.traceTrace("Initializing using static throttling policy : "
- + inLinePolicy);
+ + inLinePolicy);
}
try {
// process the policy
- throttle = ThrottlePolicyProcessor.processPolicy(
- PolicyEngine.getPolicy(inLinePolicy));
+ throttle = ThrottleFactory.createMediatorThrottle(
+ PolicyEngine.getPolicy(inLinePolicy));
//At this point concurrent access controller definitely 'null'
// f the clustering is disable.
@@ -156,7 +155,7 @@
// that message mediation has occurred through this mediator.
if (throttle != null && concurrentAccessController == null) {
concurrentAccessController =
- throttle.getConcurrentAccessController();
+ throttle.getConcurrentAccessController();
if (concurrentAccessController != null) {
cc.setProperty(key, concurrentAccessController);
}
@@ -174,7 +173,7 @@
Entry entry = synCtx.getConfiguration().getEntryDefinition(policyKey);
if (entry == null) {
handleException("Cannot find throttling policy using key : "
- + policyKey, synCtx);
+ + policyKey, synCtx);
} else {
boolean reCreate = false;
@@ -188,37 +187,37 @@
Object entryValue = synCtx.getEntry(policyKey);
if (entryValue == null) {
handleException(
- "Null throttling policy returned by Entry : "
- + policyKey, synCtx);
+ "Null throttling policy returned by Entry : "
+ + policyKey, synCtx);
} else {
if (!(entryValue instanceof OMElement)) {
handleException("Policy returned from key : " + policyKey +
- " is not an OMElement", synCtx);
+ " is not an OMElement", synCtx);
} else {
- //Check for reload in a cluster environment â
+ //Check for reload in a cluster environment âââ‰â¬Å
// For clustered environment ,if the concurrent access controller
// is not null and throttle is not null , then must reload.
if (isClusteringEnable && concurrentAccessController != null
- && throttle != null) {
+ && throttle != null) {
concurrentAccessController = null; // set null ,
// because need reload
}
try {
// Creates the throttle from the policy
- throttle = ThrottlePolicyProcessor.processPolicy(
- PolicyEngine.getPolicy((OMElement) entryValue));
+ throttle = ThrottleFactory.createMediatorThrottle(
+ PolicyEngine.getPolicy((OMElement) entryValue));
//For non-clustered environment , must re-initiates
//For clustered environment,
//concurrent access controller is null ,
//then must re-initiates
if (throttle != null && (concurrentAccessController == null
- || !isClusteringEnable)) {
+ || !isClusteringEnable)) {
concurrentAccessController =
- throttle.getConcurrentAccessController();
+ throttle.getConcurrentAccessController();
if (concurrentAccessController != null) {
cc.setProperty(key, concurrentAccessController);
} else {
@@ -227,7 +226,7 @@
}
} catch (ThrottleException e) {
handleException("Error processing the throttling policy",
- e, synCtx);
+ e, synCtx);
}
}
}
@@ -238,7 +237,7 @@
// if the message flow path is OUT , then must lookp from ConfigurationContext -
// never create ,just get the existing one
concurrentAccessController =
- (ConcurrentAccessController) cc.getProperty(key);
+ (ConcurrentAccessController) cc.getProperty(key);
}
}
//perform concurrency throttling
@@ -256,12 +255,12 @@
try {
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Going to replicates the " +
- "states of the ConcurrentAccessController with key : " + key);
+ "states of the ConcurrentAccessController with key : " + key);
}
Replicator.replicate(cc);
} catch (ClusteringFault clusteringFault) {
handleException("Error during the replicating states ",
- clusteringFault, synCtx);
+ clusteringFault, synCtx);
}
}
}
@@ -272,7 +271,7 @@
return mediator.mediate(synCtx);
} else {
handleException("Unable to find onAccept sequence with key : "
- + onAcceptSeqKey, synCtx);
+ + onAcceptSeqKey, synCtx);
}
} else if (onAcceptMediator != null) {
return onAcceptMediator.mediate(synCtx);
@@ -287,7 +286,7 @@
return mediator.mediate(synCtx);
} else {
handleException("Unable to find onReject sequence with key : "
- + onRejectSeqKey, synCtx);
+ + onRejectSeqKey, synCtx);
}
} else if (onRejectMediator != null) {
return onRejectMediator.mediate(synCtx);
@@ -303,8 +302,8 @@
/**
* Helper method that handles the concurrent access through throttle
*
- * @param isResponse Current Message is response or not
- * @param synLog the Synapse log to use
+ * @param isResponse Current Message is response or not
+ * @param synLog the Synapse log to use
* @return true if the caller can access ,o.w. false
*/
private boolean doThrottleByConcurrency(boolean isResponse, SynapseLog synLog) {
@@ -314,7 +313,7 @@
int concurrentLimit = concurrentAccessController.getLimit();
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Concurrent access controller for ID : " + id +
- " allows : " + concurrentLimit + " concurrent accesses");
+ " allows : " + concurrentLimit + " concurrent accesses");
}
int available;
if (!isResponse) {
@@ -322,14 +321,14 @@
canAcess = available > 0;
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Concurrency Throttle : Access " +
- (canAcess ? "allowed" : "denied") + " :: " + available
- + " of available of " + concurrentLimit + " connections");
+ (canAcess ? "allowed" : "denied") + " :: " + available
+ + " of available of " + concurrentLimit + " connections");
}
} else {
available = concurrentAccessController.incrementAndGet();
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Concurrency Throttle : Connection returned" + " :: " +
- available + " of available of " + concurrentLimit + " connections");
+ available + " of available of " + concurrentLimit + " connections");
}
}
}
@@ -339,19 +338,22 @@
/**
* Helper method that handles the access-rate based throttling
*
- * @param synCtx MessageContext(Synapse)
- * @param axisMC MessageContext(Axis2)
- * @param cc ConfigurationContext
- * @param synLog the Synapse log to use
+ * @param synCtx MessageContext(Synapse)
+ * @param axisMC MessageContext(Axis2)
+ * @param cc ConfigurationContext
+ * @param synLog the Synapse log to use
* @return ue if the caller can access ,o.w. false
*/
- private boolean throttleByAccessRate(MessageContext synCtx, org.apache.axis2.context.MessageContext axisMC, ConfigurationContext cc, SynapseLog synLog) {
+ private boolean throttleByAccessRate(MessageContext synCtx,
+ org.apache.axis2.context.MessageContext axisMC,
+ ConfigurationContext cc,
+ SynapseLog synLog) {
String callerId = null;
boolean canAccess = true;
//remote ip of the caller
String remoteIP = (String) axisMC.getPropertyNonReplicable(
- org.apache.axis2.context.MessageContext.REMOTE_ADDR);
+ org.apache.axis2.context.MessageContext.REMOTE_ADDR);
//domain name of the caller
String domainName = (String) axisMC.getPropertyNonReplicable(NhttpConstants.REMOTE_HOST);
@@ -364,7 +366,7 @@
}
// loads the DomainBasedThrottleContext
ThrottleContext context
- = throttle.getThrottleContext(ThrottleConstants.DOMAIN_BASED_THROTTLE_KEY);
+ = throttle.getThrottleContext(ThrottleConstants.DOMAIN_BASED_THROTTLE_KEY);
if (context != null) {
//loads the DomainBasedThrottleConfiguration
ThrottleConfiguration config = context.getThrottleConfiguration();
@@ -381,12 +383,13 @@
try {
//Checks for access state
- canAccess = accessControler.canAccess(context,
- callerId, ThrottleConstants.DOMAIN_BASE);
+ AccessInformation accessInformation = accessControler.canAccess(context,
+ callerId, ThrottleConstants.DOMAIN_BASE);
+ canAccess = accessInformation.isAccessAllowed();
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Access " + (canAccess ? "allowed" : "denied")
- + " for Domain Name : " + domainName);
+ + " for Domain Name : " + domainName);
}
//In the case of both of concurrency throttling and
@@ -415,7 +418,9 @@
if (callerId == null) {
//do the IP-based throttling
if (remoteIP == null) {
- synLog.traceOrDebug("The IP address of the caller cannot be found");
+ if (synLog.isTraceOrDebugEnabled()) {
+ synLog.traceOrDebug("The IP address of the caller cannot be found");
+ }
canAccess = true;
} else {
@@ -425,7 +430,7 @@
try {
// Loads the IPBasedThrottleContext
ThrottleContext context =
- throttle.getThrottleContext(ThrottleConstants.IP_BASED_THROTTLE_KEY);
+ throttle.getThrottleContext(ThrottleConstants.IP_BASED_THROTTLE_KEY);
if (context != null) {
//Loads the IPBasedThrottleConfiguration
ThrottleConfiguration config = context.getThrottleConfiguration();
@@ -440,13 +445,16 @@
context.setThrottleId(id);
}
//Checks access state
- canAccess = accessControler.canAccess(context,
- callerId, ThrottleConstants.IP_BASE);
+ AccessInformation accessInformation = accessControler.canAccess(
+ context,
+ callerId,
+ ThrottleConstants.IP_BASE);
+ canAccess = accessInformation.isAccessAllowed();
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Access " +
- (canAccess ? "allowed" : "denied")
- + " for IP : " + remoteIP);
+ (canAccess ? "allowed" : "denied")
+ + " for IP : " + remoteIP);
}
//In the case of both of concurrency throttling and
//rate based throttling have enabled ,
Modified: synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorTest.java?rev=784678&r1=784677&r2=784678&view=diff
==============================================================================
--- synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorTest.java (original)
+++ synapse/trunk/java/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorTest.java Mon Jun 15 07:47:57 2009
@@ -27,21 +27,23 @@
import org.apache.neethi.PolicyEngine;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseException;
-import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.config.Entry;
-import org.apache.synapse.config.SynapseConfigUtils;
+import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.core.axis2.Axis2SynapseEnvironment;
import org.apache.synapse.mediators.AbstractMediator;
import org.wso2.throttle.*;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
import java.io.ByteArrayInputStream;
+import java.io.StringReader;
/**
* Throttle Mediator Test - This class test throttling when policy has specified as both of
* InLine and a registry key
- *
*/
public class ThrottleMediatorTest extends TestCase {
@@ -50,7 +52,7 @@
private static final String POLICY = " <wsp:Policy xmlns:wsp=\"http://schemas.xmlsoap.org/ws/2004/09/policy\"\n" +
" xmlns:throttle=\"http://www.wso2.org/products/wso2commons/throttle\">\n" +
" <throttle:ThrottleAssertion>\n" +
- " <throttle:MaximumConcurrentAccess>10</throttle:MaximumConcurrentAccess>\n" +
+ " <throttle:MaximumConcurrentAccess>10</throttle:MaximumConcurrentAccess>\n" +
" <wsp:All>\n" +
" <throttle:ID throttle:type=\"IP\">other</throttle:ID>\n" +
" <wsp:ExactlyOne>\n" +
@@ -115,12 +117,24 @@
OMAbstractFactory.getSOAP11Factory().createOMDocument();
omDoc.addChild(envelope);
- envelope.getBody().addChild(SynapseConfigUtils.stringToOM(payload));
+ envelope.getBody().addChild(createOMElement(payload));
synMc.setEnvelope(envelope);
return synMc;
}
+ public static OMElement createOMElement(String xml) {
+ try {
+ XMLStreamReader reader = XMLInputFactory
+ .newInstance().createXMLStreamReader(new StringReader(xml));
+ StAXOMBuilder builder = new StAXOMBuilder(reader);
+ return builder.getDocumentElement();
+ }
+ catch (XMLStreamException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public void testMediate() throws Exception {
ByteArrayInputStream in = new ByteArrayInputStream(POLICY.getBytes());
StAXOMBuilder builde = new StAXOMBuilder(in);
@@ -155,35 +169,36 @@
}
}
+
public void testMediateWithInLineXML() throws Exception {
- ByteArrayInputStream in = new ByteArrayInputStream(POLICY.getBytes());
- StAXOMBuilder build = new StAXOMBuilder(in);
- ThrottleTestMediator throttleMediator = new ThrottleTestMediator();
- throttleMediator.setInLinePolicy(build.getDocumentElement());
- MessageContext synCtx = createLightweightSynapseMessageContext("<empty/>");
- synCtx.setProperty(REMOTE_ADDR, "192.168.8.212");
- SynapseConfiguration synCfg = new SynapseConfiguration();
- synCtx.setConfiguration(synCfg);
- for (int i = 0; i < 6; i++) {
- try {
- throttleMediator.mediate(synCtx);
- Thread.sleep(1000);
- }
- catch (Exception e) {
-
- if (i == 3) {
- assertTrue(e.getMessage().lastIndexOf("IP_BASE") > 0);
- }
- if (i == 4) {
- assertTrue(e.getMessage().lastIndexOf("IP_BASE") > 0);
- }
- if (i == 5) {
- assertTrue(e.getMessage().lastIndexOf("IP_BASE") > 0);
- }
- }
- }
+ ByteArrayInputStream in = new ByteArrayInputStream(POLICY.getBytes());
+ StAXOMBuilder build = new StAXOMBuilder(in);
+ ThrottleTestMediator throttleMediator = new ThrottleTestMediator();
+ throttleMediator.setInLinePolicy(build.getDocumentElement());
+ MessageContext synCtx = createLightweightSynapseMessageContext("<empty/>");
+ synCtx.setProperty(REMOTE_ADDR, "192.168.8.212");
+ SynapseConfiguration synCfg = new SynapseConfiguration();
+ synCtx.setConfiguration(synCfg);
+ for (int i = 0; i < 6; i++) {
+ try {
+ throttleMediator.mediate(synCtx);
+ Thread.sleep(1000);
+ }
+ catch (Exception e) {
+
+ if (i == 3) {
+ assertTrue(e.getMessage().lastIndexOf("IP_BASE") > 0);
+ }
+ if (i == 4) {
+ assertTrue(e.getMessage().lastIndexOf("IP_BASE") > 0);
+ }
+ if (i == 5) {
+ assertTrue(e.getMessage().lastIndexOf("IP_BASE") > 0);
+ }
+ }
+ }
- }
+ }
public class ThrottleTestMediator extends AbstractMediator {
@@ -213,24 +228,28 @@
}
//IP based throttling
- String remoteIP = (String)synContext.getProperty(REMOTE_ADDR);
+ String remoteIP = (String) synContext.getProperty(REMOTE_ADDR);
if (remoteIP == null) {
- throw new ThrottleException("IP address of the caller can not find - Currently only support caller-IP base access control" +
+ throw new ThrottleException("IP address of the caller can not find - " +
+ "Currently only support caller-IP base access control" +
"- Thottling will not happen ");
} else {
ThrottleContext throttleContext
= throttle.getThrottleContext(ThrottleConstants.IP_BASED_THROTTLE_KEY);
if (throttleContext == null) {
- throw new ThrottleException("Can not find a configuartion for IP Based Throttle");
+ throw new ThrottleException("Can not find a configuartion for " +
+ "IP Based Throttle");
}
AccessRateController accessControler;
try {
- accessControler =new AccessRateController();
- boolean canAccess = accessControler.canAccess(throttleContext, remoteIP,ThrottleConstants.IP_BASE);
+ accessControler = new AccessRateController();
+ boolean canAccess = accessControler.canAccess(
+ throttleContext, remoteIP, ThrottleConstants.IP_BASE).isAccessAllowed();
if (!canAccess) {
- throw new SynapseException("Access has currently been denied by the IP_BASE throttle for IP :\t" + remoteIP);
+ throw new SynapseException("Access has currently been denied by" +
+ " the IP_BASE throttle for IP :\t" + remoteIP);
}
return canAccess;
}
@@ -260,7 +279,7 @@
reCreate = true;
}
policyOmElement = (OMElement) entryValue;
- } else if (inLinePolicy != null){
+ } else if (inLinePolicy != null) {
policyOmElement = inLinePolicy;
}
if (policyOmElement == null) {
@@ -280,8 +299,8 @@
protected void createThrottleMetaData(OMElement policyOmElement) {
try {
- throttle = ThrottlePolicyProcessor
- .processPolicy(PolicyEngine.getPolicy(policyOmElement));
+ throttle = ThrottleFactory.createMediatorThrottle(
+ PolicyEngine.getPolicy(policyOmElement));
}
catch (ThrottleException e) {
Modified: synapse/trunk/java/pom.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/pom.xml?rev=784678&r1=784677&r2=784678&view=diff
==============================================================================
--- synapse/trunk/java/pom.xml (original)
+++ synapse/trunk/java/pom.xml Mon Jun 15 07:47:57 2009
@@ -862,7 +862,7 @@
<!-- dependencies of Synapse extensions module -->
<wso2commons.version>1.2</wso2commons.version>
<wso2caching.version>3.0</wso2caching.version>
- <wso2throttle.version>1.6</wso2throttle.version>
+ <wso2throttle.version>3.0</wso2throttle.version>
<wso2eventing-api.version>SNAPSHOT</wso2eventing-api.version>
<xbean.version>2.2.0</xbean.version>
<bsf.version>3.0-beta2</bsf.version>