You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by ch...@apache.org on 2007/03/30 10:15:35 UTC
svn commit: r523982 - in /webservices/synapse/trunk/java/modules/core/src:
main/java/org/apache/synapse/config/xml/
main/java/org/apache/synapse/config/xml/endpoints/
main/java/org/apache/synapse/core/axis2/
main/java/org/apache/synapse/endpoints/ main...
Author: chathura_ce
Date: Fri Mar 30 01:15:29 2007
New Revision: 523982
URL: http://svn.apache.org/viewvc?view=rev&rev=523982
Log:
Added HTTP cookie based session dispatcher. Now Loadbalance endpoints can track HTTP sessions.
Improved the failure handling of endpoints. Duration to suspend an leaf level endpoint after a failure can be set using <suspendDurationOnFailure> element.
Parent endpoints determine whether they have failed or not by examine the status of their child endpoints.
Resolved some concurrency issues in endpoints.
Added:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/HttpSessionDispatcher.java
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/Constants.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointFactory.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointSerializer.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/FailoverEndpointSerializer.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointSerializer.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/SALoadbalanceEndpointFactory.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/SALoadbalanceEndpointSerializer.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/WSDLEndpointFactory.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/WSDLEndpointSerializer.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AddressEndpoint.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/Endpoint.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/FailoverEndpoint.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/IndirectEndpoint.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/Dispatcher.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SimpleClientSessionDispatcher.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SoapSessionDispatcher.java
webservices/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/SendMediatorSerializationTest.java
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/Constants.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/Constants.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/Constants.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/Constants.java Fri Mar 30 01:15:29 2007
@@ -84,7 +84,7 @@
String LOADBALANCE_ELEMENT = "loadbalance";
/** failover only element */
String FAILOVER_ELEMENT = "failover";
- String RETRY_AFTER_FAILURE_TIME = "retryAfterFailure";
+ String SUSPEND_DURATION_ON_FAILURE = "suspendDurationOnFailure";
String MAXIMUM_RETRIES = "maximumRetries";
String RETRY_INTERVAL = "retryInterval";
/** failover attribute in the loadbalance element */
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointFactory.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointFactory.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointFactory.java Fri Mar 30 01:15:29 2007
@@ -69,6 +69,23 @@
}
}
+ // set the suspend on fail duration.
+ OMElement suspendElement = epConfig.getFirstChildWithName(new QName(
+ Constants.SYNAPSE_NAMESPACE,
+ org.apache.synapse.config.xml.Constants.SUSPEND_DURATION_ON_FAILURE));
+
+ if (suspendElement != null) {
+ String suspend = suspendElement.getText();
+
+ try {
+ long suspendDuration = Long.parseLong(suspend);
+ addressEndpoint.setSuspendOnFailDuration(suspendDuration);
+
+ } catch (NumberFormatException e) {
+ handleException("suspendDuratiOnFailure should be valid number.");
+ }
+ }
+
OMElement addressElement = epConfig.getFirstChildWithName
(new QName(Constants.SYNAPSE_NAMESPACE, "address"));
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointSerializer.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointSerializer.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointSerializer.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/AddressEndpointSerializer.java Fri Mar 30 01:15:29 2007
@@ -77,6 +77,18 @@
endpointElement.addAttribute("name", name, null);
}
+ long suspendDuration = addressEndpoint.getSuspendOnFailDuration();
+ if (suspendDuration != Long.MAX_VALUE) {
+ // user has set some value for this. let's serialize it.
+
+ OMElement suspendElement = fac.createOMElement(
+ org.apache.synapse.config.xml.Constants.SUSPEND_DURATION_ON_FAILURE,
+ Constants.SYNAPSE_OMNAMESPACE);
+
+ suspendElement.setText(Long.toString(suspendDuration));
+ endpointElement.addChild(suspendElement);
+ }
+
EndpointDefinition epAddress = addressEndpoint.getEndpoint();
OMElement addressElement = serializeEndpointDefinition(epAddress);
endpointElement.addChild(addressElement);
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/FailoverEndpointSerializer.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/FailoverEndpointSerializer.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/FailoverEndpointSerializer.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/FailoverEndpointSerializer.java Fri Mar 30 01:15:29 2007
@@ -28,6 +28,7 @@
import org.apache.synapse.Constants;
import java.util.ArrayList;
+import java.util.List;
/**
* Serializes FailoverEndpoint to XML configuration.
@@ -61,7 +62,7 @@
endpointElement.addAttribute("name", name, null);
}
- ArrayList endpoints = failoverEndpoint.getEndpoints();
+ List endpoints = failoverEndpoint.getEndpoints();
for (int i = 0; i < endpoints.size(); i++) {
Endpoint childEndpoint = (Endpoint) endpoints.get(i);
EndpointSerializer serializer = EndpointAbstractSerializer.
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java Fri Mar 30 01:15:29 2007
@@ -85,33 +85,11 @@
createLoadbalanceAlgorithm(loadbalanceElement, endpoints);
loadbalanceEndpoint.setAlgorithm(algorithm);
- // set abandon time
- long abandonTime = 0;
- OMAttribute atAttribute = loadbalanceElement.getAttribute
- (new QName(null, org.apache.synapse.config.xml.Constants.RETRY_AFTER_FAILURE_TIME));
- if(atAttribute != null) {
- String at = atAttribute.getAttributeValue();
- abandonTime = Long.parseLong(at);
- loadbalanceEndpoint.setAbandonTime(abandonTime);
+ // set if failover is turned off
+ String failover = loadbalanceElement.getAttributeValue(new QName("failover"));
+ if (failover != null && failover.equalsIgnoreCase("false")) {
+ loadbalanceEndpoint.setFailover(false);
}
-
- //long retryInterval = 30000;
- //OMAttribute riAttribute = loadbalanceElement.getAttribute
- // (new QName(null, Constants.RETRY_INTERVAL));
- //
- //if(riAttribute != null) {
- // String ri = riAttribute.getAttributeValue();
- // retryInterval = Long.parseLong(ri);
- //}
-
- //int maximumRetries = 0;
- //OMAttribute mrAttribute = loadbalanceElement.getAttribute
- // (new QName(null, Constants.MAXIMUM_RETRIES));
- //
- //if(mrAttribute != null) {
- // String mr = mrAttribute.getAttributeValue();
- // maximumRetries = Integer.parseInt(mr);
- //}
return loadbalanceEndpoint;
}
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointSerializer.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointSerializer.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointSerializer.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointSerializer.java Fri Mar 30 01:15:29 2007
@@ -30,6 +30,7 @@
import org.apache.synapse.Constants;
import java.util.ArrayList;
+import java.util.List;
/**
* Serializes LoadbalanceEndpoint to an XML configuration.
@@ -68,9 +69,15 @@
if (algorithm instanceof RoundRobin) {
algorithmName = "roundRobin";
}
- loadbalanceElement.addAttribute("algorithm", algorithmName, null);
+ loadbalanceElement.addAttribute
+ (org.apache.synapse.config.xml.Constants.ALGORITHM_NAME, algorithmName, null);
- ArrayList endpoints = loadbalanceEndpoint.getEndpoints();
+ // set if failover is turned off in the endpoint
+ if (!loadbalanceEndpoint.isFailover()) {
+ loadbalanceElement.addAttribute("failover", "false", null);
+ }
+
+ List endpoints = loadbalanceEndpoint.getEndpoints();
for (int i = 0; i < endpoints.size(); i++) {
Endpoint childEndpoint = (Endpoint) endpoints.get(i);
EndpointSerializer serializer = EndpointAbstractSerializer.
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/SALoadbalanceEndpointFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/SALoadbalanceEndpointFactory.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/SALoadbalanceEndpointFactory.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/SALoadbalanceEndpointFactory.java Fri Mar 30 01:15:29 2007
@@ -27,6 +27,7 @@
import org.apache.synapse.endpoints.dispatch.Dispatcher;
import org.apache.synapse.endpoints.dispatch.SoapSessionDispatcher;
import org.apache.synapse.endpoints.dispatch.SimpleClientSessionDispatcher;
+import org.apache.synapse.endpoints.dispatch.HttpSessionDispatcher;
import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
import org.apache.synapse.Constants;
import org.apache.synapse.SynapseException;
@@ -76,6 +77,10 @@
if (type.equalsIgnoreCase("soap")) {
Dispatcher soapDispatcher = new SoapSessionDispatcher();
loadbalanceEndpoint.setDispatcher(soapDispatcher);
+
+ } else if (type.equalsIgnoreCase("http")) {
+ Dispatcher httpDispatcher = new HttpSessionDispatcher();
+ loadbalanceEndpoint.setDispatcher(httpDispatcher);
} else if (type.equalsIgnoreCase("simpleClientSession")) {
Dispatcher csDispatcher = new SimpleClientSessionDispatcher();
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/SALoadbalanceEndpointSerializer.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/SALoadbalanceEndpointSerializer.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/SALoadbalanceEndpointSerializer.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/SALoadbalanceEndpointSerializer.java Fri Mar 30 01:15:29 2007
@@ -27,6 +27,7 @@
import org.apache.synapse.endpoints.dispatch.Dispatcher;
import org.apache.synapse.endpoints.dispatch.SoapSessionDispatcher;
import org.apache.synapse.endpoints.dispatch.SimpleClientSessionDispatcher;
+import org.apache.synapse.endpoints.dispatch.HttpSessionDispatcher;
import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
import org.apache.synapse.endpoints.algorithms.RoundRobin;
import org.apache.synapse.SynapseException;
@@ -36,6 +37,7 @@
import javax.xml.namespace.QName;
import java.util.ArrayList;
+import java.util.List;
public class SALoadbalanceEndpointSerializer implements EndpointSerializer {
@@ -67,6 +69,11 @@
sessionElement.addAttribute("type", "soap", null);
endpointElement.addChild(sessionElement);
+ } else if (dispatcher instanceof HttpSessionDispatcher) {
+ OMElement sessionElement = fac.createOMElement("session", Constants.SYNAPSE_OMNAMESPACE);
+ sessionElement.addAttribute("type", "http", null);
+ endpointElement.addChild(sessionElement);
+
} else if (dispatcher instanceof SimpleClientSessionDispatcher) {
OMElement sessionElement = fac.createOMElement("session", Constants.SYNAPSE_OMNAMESPACE);
sessionElement.addAttribute("type", "clientSession", null);
@@ -83,7 +90,7 @@
}
loadbalanceElement.addAttribute("algorithm", algorithmName, null);
- ArrayList endpoints = loadbalanceEndpoint.getEndpoints();
+ List endpoints = loadbalanceEndpoint.getEndpoints();
for (int i = 0; i < endpoints.size(); i++) {
Endpoint childEndpoint = (Endpoint) endpoints.get(i);
EndpointSerializer serializer = EndpointAbstractSerializer.
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/WSDLEndpointFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/WSDLEndpointFactory.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/WSDLEndpointFactory.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/WSDLEndpointFactory.java Fri Mar 30 01:15:29 2007
@@ -81,6 +81,22 @@
}
}
+ // set the suspend on fail duration.
+ OMElement suspendElement = epConfig.getFirstChildWithName
+ (new QName(Constants.SYNAPSE_NAMESPACE, "suspendOnFailDuration"));
+
+ if (suspendElement != null) {
+ String suspend = suspendElement.getText();
+
+ try {
+ long suspendDuration = Long.parseLong(suspend);
+ wsdlEndpoint.setSuspendOnFailDuration(suspendDuration);
+
+ } catch (NumberFormatException e) {
+ handleException("suspendOnFailDuration should be valid number.");
+ }
+ }
+
OMElement wsdlElement = epConfig.getFirstChildWithName
(new QName(Constants.SYNAPSE_NAMESPACE, "wsdl"));
@@ -90,11 +106,15 @@
// get the service name and port name. at this point we should not worry about the presence
// of those parameters. they are handled by corresponding WSDL builders.
- String serviceName = wsdlElement.getAttributeValue(new QName(org.apache.synapse.config.xml.Constants.NULL_NAMESPACE,"service"));
- String portName = wsdlElement.getAttributeValue(new QName(org.apache.synapse.config.xml.Constants.NULL_NAMESPACE,"port"));
+ String serviceName = wsdlElement.getAttributeValue
+ (new QName(org.apache.synapse.config.xml.Constants.NULL_NAMESPACE,"service"));
+
+ String portName = wsdlElement.getAttributeValue
+ (new QName(org.apache.synapse.config.xml.Constants.NULL_NAMESPACE,"port"));
// check if wsdl is supplied as a URI
- String wsdlURI = wsdlElement.getAttributeValue(new QName(org.apache.synapse.config.xml.Constants.NULL_NAMESPACE,"uri"));
+ String wsdlURI = wsdlElement.getAttributeValue
+ (new QName(org.apache.synapse.config.xml.Constants.NULL_NAMESPACE,"uri"));
// set serviceName and portName in the endpoint. it does not matter if these are
// null at this point. we are setting them only for serialization purpose.
@@ -106,7 +126,9 @@
try {
URL wsdlURL = new URL(wsdlURI);
- StAXOMBuilder OMBuilder = new StAXOMBuilder(wsdlURL.openConnection().getInputStream());
+ StAXOMBuilder OMBuilder = new StAXOMBuilder
+ (wsdlURL.openConnection().getInputStream());
+
OMElement docElement = OMBuilder.getDocumentElement();
String ns = docElement.getNamespace().getNamespaceURI();
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/WSDLEndpointSerializer.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/WSDLEndpointSerializer.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/WSDLEndpointSerializer.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/WSDLEndpointSerializer.java Fri Mar 30 01:15:29 2007
@@ -60,6 +60,16 @@
endpointElement.addAttribute("name", name, null);
}
+ long suspendDuration = wsdlEndpoint.getSuspendOnFailDuration();
+ if (suspendDuration != Long.MAX_VALUE) {
+ // user has set some value for this. let's serialize it.
+
+ OMElement suspendElement = fac.createOMElement
+ ("suspendOnFailDuration", Constants.SYNAPSE_OMNAMESPACE);
+ suspendElement.setText(Long.toString(suspendDuration));
+ endpointElement.addChild(suspendElement);
+ }
+
OMElement wsdlElement = fac.createOMElement("wsdl", Constants.SYNAPSE_OMNAMESPACE);
String serviceName = wsdlEndpoint.getServiceName();
if (serviceName != null) {
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java Fri Mar 30 01:15:29 2007
@@ -207,6 +207,10 @@
newMC.setEnvelope(ori.getEnvelope());
removeAddressingHeaders(newMC);
+ // pass any transport headers on the original request
+ newMC.setProperty(MessageContext.TRANSPORT_HEADERS,
+ ori.getProperty(MessageContext.TRANSPORT_HEADERS));
+
return newMC;
}
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AddressEndpoint.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AddressEndpoint.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AddressEndpoint.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AddressEndpoint.java Fri Mar 30 01:15:29 2007
@@ -40,11 +40,42 @@
private static final Log log = LogFactory.getLog(AddressEndpoint.class);
+ /**
+ * Name of the endpoint. Used for named endpoints which can be referred using the key attribute
+ * of indirect endpoints.
+ */
private String name;
- private boolean active = true;
+
+ /**
+ * Determines if this endpoint is active or not. This variable have to be loaded always from the
+ * memory as multiple threads could access it.
+ */
+ private volatile boolean active = true;
+
+ /**
+ * Stores the endpoint details for this endpoint. Details include EPR, WS-Addressing information,
+ * WS-Security information, etc.
+ */
private EndpointDefinition endpoint = null;
+
+ /**
+ * Parent endpoint of this endpoint if this used inside another endpoint. Possible parents are
+ * LoadbalanceEndpoint, SALoadbalanceEndpoint and FailoverEndpoint objects.
+ */
private Endpoint parentEndpoint = null;
+ /**
+ * Leaf level endpoints will be suspended for the specified time by this variable, after a
+ * failure. If this is not explicitly set, endpoints will be suspended forever.
+ */
+ private long suspendOnFailDuration = Long.MAX_VALUE;
+
+ /**
+ * Time to recover a failed endpoint. Value of this is calculated when endpoint is set as
+ * failed by adding suspendOnFailDuration to current time.
+ */
+ private long recoverOn = Long.MAX_VALUE;
+
public EndpointDefinition getEndpoint() {
return endpoint;
}
@@ -61,11 +92,43 @@
this.name = name;
}
- public boolean isActive() {
+ /**
+ * Checks if the endpoint is active (failed or not). If endpoint is in failed state and
+ * suspendOnFailDuration has elapsed, it will be set to active.
+ *
+ * @param synMessageContext MessageContext of the current message. This is not used here.
+ *
+ * @return true if endpoint is active. false otherwise.
+ */
+ public boolean isActive(MessageContext synMessageContext) {
+
+ if (!active) {
+ if (System.currentTimeMillis() > recoverOn) {
+ active = true;
+ recoverOn = 0;
+ }
+ }
+
return active;
}
- public void setActive(boolean active) {
+ /**
+ * Sets if endpoint active or not. if endpoint is set as failed (active = false), the recover on
+ * time is calculated so that it will be activated after the recover on time.
+ *
+ * @param active true if active. false otherwise.
+ *
+ * @param synMessageContext MessageContext of the current message. This is not used here.
+ */
+ public synchronized void setActive(boolean active, MessageContext synMessageContext) {
+
+ // this is synchronized as recoverOn can be set to unpredictable values if two threads call
+ // this method simultaneously.
+
+ if (!active && suspendOnFailDuration != Long.MAX_VALUE) {
+ recoverOn = System.currentTimeMillis() + suspendOnFailDuration;
+ }
+
this.active = active;
}
@@ -146,7 +209,9 @@
synCtx.setProperty(Constants.OUTFLOW_ADDRESSING_ON, Boolean.TRUE);
}
+ // register this as the immediate fault handler for this message.
synCtx.pushFaultHandler(this);
+
synCtx.getEnvironment().send(endpoint, synCtx);
}
}
@@ -159,10 +224,20 @@
this.parentEndpoint = parentEndpoint;
}
+ public long getSuspendOnFailDuration() {
+ return suspendOnFailDuration;
+ }
+
+ public void setSuspendOnFailDuration(long suspendOnFailDuration) {
+ this.suspendOnFailDuration = suspendOnFailDuration;
+ }
+
public void onFault(MessageContext synCtx) {
// perform retries here
// if this endpoint has actually failed, inform the parent.
+ setActive(false, synCtx);
+
if (parentEndpoint != null) {
parentEndpoint.onChildEndpointFail(this, synCtx);
} else {
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/Endpoint.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/Endpoint.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/Endpoint.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/Endpoint.java Fri Mar 30 01:15:29 2007
@@ -77,9 +77,13 @@
* Returns if the endpoint is currently active or not. Messages should not be sent to inactive
* endpoints.
*
+ * @param synMessageContext MessageContext for the current message. This is required for
+ * IndirectEndpoints where the actual endpoint is retrieved from the MessageContext. Other
+ * Endpoint implementations may ignore this parameter.
+ *
* @return true if the endpoint is in active state. false otherwise.
*/
- public boolean isActive();
+ public boolean isActive(MessageContext synMessageContext);
/**
* Sets the endpoint as active or inactive. If an endpoint is detected as failed, it should be
@@ -87,6 +91,10 @@
* avoid ignoring endpoints forever.
*
* @param active true if active. false otherwise.
+ *
+ * @param synMessageContext MessageContext for the current message. This is required for
+ * IndirectEndpoints where the actual endpoint is retrieved from the MessageContext. Other
+ * Endpoint implementations may ignore this parameter.
*/
- public void setActive(boolean active);
+ public void setActive(boolean active, MessageContext synMessageContext);
}
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/FailoverEndpoint.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/FailoverEndpoint.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/FailoverEndpoint.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/FailoverEndpoint.java Fri Mar 30 01:15:29 2007
@@ -22,7 +22,7 @@
import org.apache.synapse.FaultHandler;
import org.apache.synapse.MessageContext;
-import java.util.ArrayList;
+import java.util.List;
/**
* FailoverEndpoint can have multiple child endpoints. It will always try to send messages to current
@@ -34,20 +34,45 @@
*/
public class FailoverEndpoint implements Endpoint {
+ /**
+ * Name of the endpoint. Used for named endpoints which can be referred using the key attribute
+ * of indirect endpoints.
+ */
private String name = null;
- private boolean active = true;
- private ArrayList endpoints = null;
+
+ /**
+ * Determine whether this endpoint is active or not. This is active iff all child endpoints of
+ * this endpoint is active. This is always loaded from the memory as it could be accessed from
+ * multiple threads simultaneously.
+ */
+ private volatile boolean active = true;
+
+ /**
+ * List of child endpoints. Failover sending is done among these. Any object implementing the
+ * Endpoint interface can be a child.
+ */
+ private List endpoints = null;
+
+ /**
+ * Endpoint for which currently sending the SOAP traffic.
+ */
private Endpoint currentEndpoint = null;
+
+ /**
+ * Parent endpoint of this endpoint if this used inside another endpoint. Possible parents are
+ * LoadbalanceEndpoint, SALoadbalanceEndpoint and FailoverEndpoint objects. But use of
+ * SALoadbalanceEndpoint as the parent is the logical scenario.
+ */
private Endpoint parentEndpoint = null;
public void send(MessageContext synMessageContext) {
// We have to build the envelop if we are supporting failover.
// Failover should sent the original message multiple times if failures occur. So we have to
- // access the envelop multiple times.
- synMessageContext.getEnvelope().build();
+ // access the envelop multiple times.
+ synMessageContext.getEnvelope().build();
- if (currentEndpoint.isActive()) {
+ if (currentEndpoint.isActive(synMessageContext)) {
currentEndpoint.send(synMessageContext);
} else {
@@ -55,7 +80,7 @@
boolean foundEndpoint = false;
for (int i = 0; i < endpoints.size(); i++) {
liveEndpoint = (Endpoint) endpoints.get(i);
- if (liveEndpoint.isActive()) {
+ if (liveEndpoint.isActive(synMessageContext)) {
foundEndpoint = true;
currentEndpoint = liveEndpoint;
currentEndpoint.send(synMessageContext);
@@ -64,6 +89,9 @@
}
if (!foundEndpoint) {
+ // there are no active child endpoints. so mark this endpoint as failed.
+ setActive(false, synMessageContext);
+
if (parentEndpoint != null) {
parentEndpoint.onChildEndpointFail(this, synMessageContext);
} else {
@@ -84,19 +112,43 @@
this.name = name;
}
- public boolean isActive() {
- return this.active;
+ /**
+ * If this endpoint is in inactive state, checks if all immediate child endpoints are still
+ * failed. If so returns false. If at least one child endpoint is in active state, sets this
+ * endpoint's state to active and returns true.
+ *
+ * @param synMessageContext MessageContext of the current message. This is not used here.
+ *
+ * @return true if active. false otherwise.
+ */
+ public boolean isActive(MessageContext synMessageContext) {
+
+ if (!active) {
+ for (int i = 0; i < endpoints.size(); i++) {
+ Endpoint endpoint = (Endpoint) endpoints.get(i);
+ if (endpoint.isActive(synMessageContext)) {
+ active = true;
+
+ // don't break the loop though we found one active endpoint. calling isActive()
+ // on all child endpoints will update their active state. so this is a good
+ // time to do that.
+ }
+ }
+ }
+
+ return active;
}
- public void setActive(boolean active) {
+ public void setActive(boolean active, MessageContext synMessageContext) {
+ // setting a volatile boolean value is thread safe.
this.active = active;
}
- public ArrayList getEndpoints() {
+ public List getEndpoints() {
return endpoints;
}
- public void setEndpoints(ArrayList endpoints) {
+ public void setEndpoints(List endpoints) {
this.endpoints = endpoints;
if (endpoints.size() > 0) {
currentEndpoint = (Endpoint) endpoints.get(0);
@@ -104,7 +156,6 @@
}
public void onChildEndpointFail(Endpoint endpoint, MessageContext synMessageContext) {
- endpoint.setActive(false);
send(synMessageContext);
}
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/IndirectEndpoint.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/IndirectEndpoint.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/IndirectEndpoint.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/IndirectEndpoint.java Fri Mar 30 01:15:29 2007
@@ -40,6 +40,12 @@
private boolean active = true;
private Endpoint parentEndpoint = null;
+ /**
+ * This should have a reference to the current message context as it gets the referred endpoint
+ * from it.
+ */
+ private MessageContext currentMsgCtx = null;
+
public void send(MessageContext synMessageContext) {
// get the actual endpoint and send
Endpoint endpoint = synMessageContext.getEndpoint(key);
@@ -47,7 +53,7 @@
handleException("Reference to non-existent endpoint for key : " + key);
}
- if (endpoint.isActive()) {
+ if (endpoint.isActive(synMessageContext)) {
endpoint.send(synMessageContext);
} else {
parentEndpoint.onChildEndpointFail(this, synMessageContext);
@@ -70,12 +76,38 @@
this.key = key;
}
- public boolean isActive() {
- return active;
+ /**
+ * IndirectEndpoints are active if its referref endpoint is active and vise versa. Therefore,
+ * this returns if its referred endpoint is active or not.
+ *
+ * @param synMessageContext MessageContext of the current message.
+ *
+ * @return true if the referred endpoint is active. false otherwise.
+ */
+ public boolean isActive(MessageContext synMessageContext) {
+ Endpoint endpoint = synMessageContext.getEndpoint(key);
+ if (endpoint == null) {
+ handleException("Reference to non-existent endpoint for key : " + key);
+ }
+
+ return endpoint.isActive(synMessageContext);
}
- public void setActive(boolean active) {
- this.active = active;
+ /**
+ * Activating or deactivating an IndirectEndpoint is the activating or deactivating its
+ * referref endpoint. Therefore, this sets the active state of its referred endpoint.
+ *
+ * @param active true if active. false otherwise.
+ *
+ * @param synMessageContext MessageContext of the current message.
+ */
+ public void setActive(boolean active, MessageContext synMessageContext) {
+ Endpoint endpoint = synMessageContext.getEndpoint(key);
+ if (endpoint == null) {
+ handleException("Reference to non-existent endpoint for key : " + key);
+ }
+
+ endpoint.setActive(active, synMessageContext);
}
public void setParentEndpoint(Endpoint parentEndpoint) {
@@ -83,7 +115,6 @@
}
public void onChildEndpointFail(Endpoint endpoint, MessageContext synMessageContext) {
- endpoint.setActive(false);
parentEndpoint.onChildEndpointFail(this, synMessageContext);
}
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java Fri Mar 30 01:15:29 2007
@@ -24,6 +24,7 @@
import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
import java.util.ArrayList;
+import java.util.List;
/**
* Load balance endpoint can have multiple endpoints. It will route messages according to the
@@ -36,26 +37,67 @@
*/
public class LoadbalanceEndpoint implements Endpoint {
- private ArrayList endpoints = null;
- private long abandonTime = 0;
- private LoadbalanceAlgorithm algorithm = null;
- private int maximumRetries = 1;
- private long retryInterval = 30000;
+ /**
+ * Name of the endpoint. Used for named endpoints which can be referred using the key attribute
+ * of indirect endpoints.
+ */
private String name = null;
- private boolean active = true;
+
+ /**
+ * List of endpoints among which the load is distributed. Any object implementing the Endpoint
+ * interface could be used.
+ */
+ private List endpoints = null;
+
+ /**
+ * Algorithm used for selecting the next endpoint to direct the load. Default is RoundRobin.
+ */
+ private LoadbalanceAlgorithm algorithm = null;
+
+ /**
+ * Determine whether this endpoint is active or not. This is active iff all child endpoints of
+ * this endpoint is active. This is always loaded from the memory as it could be accessed from
+ * multiple threads simultaneously.
+ */
+ private volatile boolean active = true;
+
+ /**
+ * If this supports load balancing with failover. If true, request will be directed to the next
+ * endpoint if the current one is failing.
+ */
+ private boolean failover = true;
+
+ /**
+ * Parent endpoint of this endpoint if this used inside another endpoint. Possible parents are
+ * LoadbalanceEndpoint, SALoadbalanceEndpoint and FailoverEndpoint objects.
+ */
private Endpoint parentEndpoint = null;
public void send(MessageContext synMessageContext) {
Endpoint endpoint = algorithm.getNextEndpoint(synMessageContext);
if (endpoint != null) {
+
+ // We have to build the envelop if we are supporting failover.
+ // Failover should sent the original message multiple times if failures occur. So we have to
+ // access the envelop multiple times.
+ if (failover) {
+ synMessageContext.getEnvelope().build();
+ }
+
endpoint.send(synMessageContext);
+
} else {
+ // there are no active child endpoints. so mark this endpoint as failed.
+ setActive(false, synMessageContext);
+
if (parentEndpoint != null) {
parentEndpoint.onChildEndpointFail(this, synMessageContext);
} else {
Object o = synMessageContext.getFaultStack().pop();
- ((FaultHandler) o).handleFault(synMessageContext);
+ if (o != null) {
+ ((FaultHandler) o).handleFault(synMessageContext);
+ }
}
}
}
@@ -76,44 +118,53 @@
this.algorithm = algorithm;
}
- public int getMaximumRetries() {
- return maximumRetries;
- }
-
- public void setMaximumRetries(int maximumRetries) {
- this.maximumRetries = maximumRetries;
- }
-
- public long getRetryInterval() {
- return retryInterval;
- }
-
- public void setRetryInterval(long retryInterval) {
- this.retryInterval = retryInterval;
- }
+ /**
+ * If this endpoint is in inactive state, checks if all immediate child endpoints are still
+ * failed. If so returns false. If at least one child endpoint is in active state, sets this
+ * endpoint's state to active and returns true. As this a sessionless load balancing endpoint
+ * having one active child endpoint is enough to consider this as active.
+ *
+ * @param synMessageContext MessageContext of the current message. This is not used here.
+ *
+ * @return true if active. false otherwise.
+ */
+ public boolean isActive(MessageContext synMessageContext) {
+
+ if (!active) {
+ for (int i = 0; i < endpoints.size(); i++) {
+ Endpoint endpoint = (Endpoint) endpoints.get(i);
+ if (endpoint.isActive(synMessageContext)) {
+ active = true;
+
+ // don't break the loop though we found one active endpoint. calling isActive()
+ // on all child endpoints will update their active state. so this is a good
+ // time to do that.
+ }
+ }
+ }
- public boolean isActive() {
return active;
}
- public void setActive(boolean active) {
+ public void setActive(boolean active, MessageContext synMessageContext) {
+ // setting a volatile boolean variable is thread safe.
this.active = active;
}
- public ArrayList getEndpoints() {
- return endpoints;
+ public boolean isFailover() {
+ return failover;
}
- public void setEndpoints(ArrayList endpoints) {
- this.endpoints = endpoints;
+ public void setFailover(boolean failover) {
+ this.failover = failover;
}
- public long getAbandonTime() {
- return abandonTime;
+ public List getEndpoints() {
+ return endpoints;
}
- public void setAbandonTime(long abandonTime) {
- this.abandonTime = abandonTime;
+ public void setEndpoints(List endpoints) {
+ this.endpoints = endpoints;
}
public void setParentEndpoint(Endpoint parentEndpoint) {
@@ -121,7 +172,19 @@
}
public void onChildEndpointFail(Endpoint endpoint, MessageContext synMessageContext) {
- endpoint.setActive(false);
- send(synMessageContext);
+
+ // resend (to a different endpoint) only if we support failover
+ if (failover) {
+ send(synMessageContext);
+ } else {
+ // we are not informing this to the parent endpoint as the failure of this loadbalance
+ // endpoint. there can be more active endpoints under this, and current request has
+ // failed only because the currently selected child endpoint has failed AND failover is
+ // turned off in this load balance endpoint. so just call the next fault handler.
+ Object o = synMessageContext.getFaultStack().pop();
+ if (o != null) {
+ ((FaultHandler) o).handleFault(synMessageContext);
+ }
+ }
}
}
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java Fri Mar 30 01:15:29 2007
@@ -23,8 +23,11 @@
import org.apache.synapse.endpoints.dispatch.Dispatcher;
import org.apache.synapse.MessageContext;
import org.apache.synapse.FaultHandler;
+import org.apache.synapse.SynapseException;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.axis2.context.OperationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.util.ArrayList;
import java.util.List;
@@ -50,11 +53,44 @@
*/
public class SALoadbalanceEndpoint implements Endpoint {
- private ArrayList endpoints = null;
- private LoadbalanceAlgorithm algorithm = null;
+ private static final Log log = LogFactory.getLog(SALoadbalanceEndpoint.class);
+
+ private static final String FIRST_MESSAGE_IN_SESSION = "first_message_in_session";
+
+ /**
+ * Name of the endpoint. Used for named endpoints which can be referred using the key attribute
+ * of indirect endpoints.
+ */
private String name = null;
- private boolean active = true;
+
+ /**
+ * List of endpoints among which the load is distributed. Any object implementing the Endpoint
+ * interface could be used.
+ */
+ private List endpoints = null;
+
+ /**
+ * Algorithm used for selecting the next endpoint to direct the first request of sessions.
+ * Default is RoundRobin.
+ */
+ private LoadbalanceAlgorithm algorithm = null;
+
+ /**
+ * Determine whether this endpoint is active or not. This is always loaded from the memory as it
+ * could be accessed from multiple threads simultaneously.
+ */
+ private volatile boolean active = true;
+
+ /**
+ * Parent endpoint of this endpoint if this used inside another endpoint. Although any endpoint
+ * can be the parent, only SALoadbalanceEndpoint should be used here. Use of any other endpoint
+ * would invalidate the session.
+ */
private Endpoint parentEndpoint = null;
+
+ /**
+ * Dispatcher used for session affinity.
+ */
private Dispatcher dispatcher = null;
public void send(MessageContext synMessageContext) {
@@ -65,16 +101,19 @@
// associated for that session.
endpoint = dispatcher.getEndpoint(synMessageContext);
if (endpoint == null) {
+
// there is no endpoint associated with this session. get a new endpoint using the
// load balance policy.
endpoint = algorithm.getNextEndpoint(synMessageContext);
// this is a start of a new session. so update session map.
if (dispatcher.isServerInitiatedSession()) {
+
// add this endpoint to the endpoint sequence of operation context.
Axis2MessageContext axis2MsgCtx = (Axis2MessageContext) synMessageContext;
OperationContext opCtx = axis2MsgCtx.getAxis2MessageContext().getOperationContext();
Object o = opCtx.getProperty("endpointList");
+
if (o != null) {
List endpointList = (List) o;
endpointList.add(this);
@@ -84,7 +123,9 @@
if (!(endpoint instanceof SALoadbalanceEndpoint)) {
endpointList.add(endpoint);
}
+
} else {
+
// this is the first endpoint in the heirachy. so create the queue and insert
// this as the first element.
List endpointList = new ArrayList();
@@ -102,17 +143,33 @@
} else {
dispatcher.updateSession(synMessageContext, endpoint);
}
+
+ // this is the first request. so an endpoint has not been bound to this session and we
+ // are free to failover if the currently selected endpoint is not working. but for
+ // failover to work, we have to build the soap envelope.
+ //synMessageContext.getEnvelope().build();
+
+ // we should also indicate that this is the first message in the session. so that
+ // onFault(...) method can resend only the failed attempts for the first message.
+ synMessageContext.setProperty(FIRST_MESSAGE_IN_SESSION, Boolean.TRUE);
}
if (endpoint != null) {
- endpoint.send(synMessageContext);
- } else {
- if (parentEndpoint != null) {
- parentEndpoint.onChildEndpointFail(this, synMessageContext);
+
+ // endpoints given by session dispatchers may not be active. therefore, we have check
+ // it here.
+ if (endpoint.isActive(synMessageContext)) {
+ //synMessageContext.getEnvelope().build(); todo: why this is needed here
+ endpoint.send(synMessageContext);
} else {
- Object o = synMessageContext.getFaultStack().pop();
- ((FaultHandler) o).handleFault(synMessageContext);
+ informFailure(synMessageContext);
}
+
+ } else {
+
+ // all child endpoints have failed. so mark this also as failed.
+ setActive(false, synMessageContext);
+ informFailure(synMessageContext);
}
}
@@ -123,6 +180,7 @@
* @param endpointList
*/
public void updateSession(MessageContext responseMsgCtx, List endpointList) {
+
Endpoint endpoint = (Endpoint) endpointList.remove(0);
dispatcher.updateSession(responseMsgCtx, endpoint);
if (endpoint instanceof SALoadbalanceEndpoint) {
@@ -146,19 +204,33 @@
this.algorithm = algorithm;
}
- public boolean isActive() {
+ /**
+ * This is active in below conditions:
+ * If a session is not started AND at least one child endpoint is active.
+ * If a session is started AND the binding endpoint is active.
+ *
+ * This is not active for all other conditions.
+ *
+ * @param synMessageContext MessageContext of the current message. This is used to determine the
+ * session.
+ *
+ * @return true is active. false otherwise.
+ */
+ public boolean isActive(MessageContext synMessageContext) {
+ // todo: implement above
+
return active;
}
- public void setActive(boolean active) {
+ public void setActive(boolean active, MessageContext synMessageContext) {
this.active = active;
}
- public ArrayList getEndpoints() {
+ public List getEndpoints() {
return endpoints;
}
- public void setEndpoints(ArrayList endpoints) {
+ public void setEndpoints(List endpoints) {
this.endpoints = endpoints;
}
@@ -175,17 +247,51 @@
}
/**
- * It is logically incorrect to failover a session affinity endpoint. If we redirect a message
- * belonging to a particular session, new endpoint is not aware of the session. So we can't handle
- * anything more at the endpoint level. Therefore, this method just deactivate the failed
- * endpoint and give the fault to the next fault handler.
+ * It is logically incorrect to failover a session affinity endpoint after the session has started.
+ * If we redirect a message belonging to a particular session, new endpoint is not aware of the
+ * session. So we can't handle anything more at the endpoint level. Therefore, this method just
+ * deactivate the failed endpoint and give the fault to the next fault handler.
+ *
+ * But if the session has not started (i.e. first message), the message will be resend by binding
+ * it to a different endpoint.
*
* @param endpoint Failed endpoint.
* @param synMessageContext MessageContext of the failed message.
*/
public void onChildEndpointFail(Endpoint endpoint, MessageContext synMessageContext) {
- endpoint.setActive(false);
- Object o = synMessageContext.getFaultStack().pop();
- ((FaultHandler)o).handleFault(synMessageContext);
+
+ Object o = synMessageContext.getProperty(FIRST_MESSAGE_IN_SESSION);
+
+ if (o != null && Boolean.TRUE.equals(o)) {
+
+ // this is the first message. so unbind the sesion with failed endpoint and start
+ // new one by resending.
+ dispatcher.unbind(synMessageContext);
+ send(synMessageContext);
+
+ } else {
+
+ // session has already started. we can't failover.
+ informFailure(synMessageContext);
+ }
+ }
+
+ private void informFailure(MessageContext synMessageContext) {
+
+ if (parentEndpoint != null) {
+ parentEndpoint.onChildEndpointFail(this, synMessageContext);
+
+ } else {
+
+ Object o = synMessageContext.getFaultStack().pop();
+ if (o != null) {
+ ((FaultHandler) o).handleFault(synMessageContext);
+ }
+ }
+ }
+
+ private static void handleException(String msg) {
+ log.error(msg);
+ throw new SynapseException(msg);
}
}
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java Fri Mar 30 01:15:29 2007
@@ -51,6 +51,18 @@
private String serviceName;
private String portName;
+ /**
+ * Leaf level endpoints will be suspended for the specified time by this variable, after a
+ * failure. If this is not explicitly set, endpoints will be suspended forever.
+ */
+ private long suspendOnFailDuration = Long.MAX_VALUE;
+
+ /**
+ * Time to recover a failed endpoint. Value of this is calculated when endpoint is set as
+ * failed by adding suspendOnFailDuration to current time.
+ */
+ private long recoverOn = Long.MAX_VALUE;
+
private boolean active = true;
private Endpoint parentEndpoint = null;
private EndpointDefinition endpointDefinition = null;
@@ -163,6 +175,14 @@
this.name = name;
}
+ public long getSuspendOnFailDuration() {
+ return suspendOnFailDuration;
+ }
+
+ public void setSuspendOnFailDuration(long suspendOnFailDuration) {
+ this.suspendOnFailDuration = suspendOnFailDuration;
+ }
+
public String getWsdlURI() {
return wsdlURI;
}
@@ -195,11 +215,39 @@
this.portName = portName;
}
- public boolean isActive() {
+ /**
+ * Checks if the endpoint is active (failed or not). If endpoint is in failed state and
+ * suspendOnFailDuration has elapsed, it will be set to active.
+ *
+ * @param synMessageContext MessageContext of the current message. This is not used here.
+ *
+ * @return true if endpoint is active. false otherwise.
+ */
+ public boolean isActive(MessageContext synMessageContext) {
+
+ if (!active) {
+ if (System.currentTimeMillis() > recoverOn) {
+ active = true;
+ }
+ }
+
return active;
}
- public void setActive(boolean active) {
+ /**
+ * Sets if endpoint active or not. if endpoint is set as failed (active = false), the recover on
+ * time is calculated so that it will be activated after the recover on time.
+ *
+ * @param active true if active. false otherwise.
+ *
+ * @param synMessageContext MessageContext of the current message. This is not used here.
+ */
+ public void setActive(boolean active, MessageContext synMessageContext) {
+
+ if (!active) {
+ recoverOn = System.currentTimeMillis() + suspendOnFailDuration;
+ }
+
this.active = active;
}
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java Fri Mar 30 01:15:29 2007
@@ -50,12 +50,15 @@
int attempts = 0;
do {
- nextEndpoint = (Endpoint) endpoints.get(currentEPR);
+ // two successive clients could get the same endpoint if not synchronized.
+ synchronized(this) {
+ nextEndpoint = (Endpoint) endpoints.get(currentEPR);
- if(currentEPR == endpoints.size() - 1) {
- currentEPR = 0;
- } else {
- currentEPR++;
+ if(currentEPR == endpoints.size() - 1) {
+ currentEPR = 0;
+ } else {
+ currentEPR++;
+ }
}
attempts++;
@@ -63,7 +66,7 @@
return null;
}
- } while (!nextEndpoint.isActive());
+ } while (!nextEndpoint.isActive(synapseMessageContext));
return nextEndpoint;
}
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/Dispatcher.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/Dispatcher.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/Dispatcher.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/Dispatcher.java Fri Mar 30 01:15:29 2007
@@ -22,6 +22,13 @@
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.MessageContext;
+/**
+ * Defines the behavior of session dispatchers. There can be two dispatcher types. Server intiated
+ * session dispatchers and client initialted session dispatchers. In the former one, server generates
+ * the session ID and sends it to the client in the first RESPONSE. In the later case, client should
+ * generate the session ID and send it to the server in the first REQUEST. A dispatcher object will
+ * be created for each session affinity load balance endpoint.
+ */
public interface Dispatcher {
/**
@@ -35,13 +42,21 @@
public Endpoint getEndpoint(MessageContext synCtx);
/**
- * Updates the session maps for client initiated sessions. This should be called in
- * client -> esb -> server flow.
+ * Updates the session maps. This will be called in the first client -> synapse -> server flow
+ * for client initiated sessions. For server initiated sessions, this will be called in the first
+ * server -> synapse -> client flow.
*
* @param synCtx SynapseMessageContext
* @param endpoint Selected endpoint for this session.
*/
public void updateSession(MessageContext synCtx, Endpoint endpoint);
+
+ /**
+ * Removes the session belonging to the given message context.
+ *
+ * @param synCtx MessageContext containing an session ID.
+ */
+ public void unbind(MessageContext synCtx);
/**
* Determine whether the session supported by the implementing dispatcher is intiated by the
Added: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/HttpSessionDispatcher.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/HttpSessionDispatcher.java?view=auto&rev=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/HttpSessionDispatcher.java (added)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/HttpSessionDispatcher.java Fri Mar 30 01:15:29 2007
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.endpoints.dispatch;
+
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Collections;
+
+/**
+ * Dispatches sessions based on HTTP cookies. Session is initiated by the server in the first response
+ * when it sends "Set-Cookie" HTTP header with the session ID. For all successive messages client
+ * should send "Cookie" HTTP header with session ID send by the server.
+ */
+public class HttpSessionDispatcher implements Dispatcher {
+
+ /**
+ * Map to store session -> endpoint mappings. Synchronized map is used as this is accessed by
+ * multiple threds (e.g. multiple clients different sessions).
+ */
+ Map sessionMap = Collections.synchronizedMap(new HashMap());
+
+ /**
+ * Check if "Cookie" HTTP header is available. If so, check if that cookie is in the session map.
+ * If cookie is available, there is a session for this cookie. return the (server) endpoint for
+ * that session.
+ *
+ * @param synCtx MessageContext possibly containing a "Cookie" HTTP header.
+ *
+ * @return Endpoint Server endpoint for the given HTTP session.
+ */
+ public Endpoint getEndpoint(MessageContext synCtx) {
+
+ Endpoint endpoint = null;
+
+ org.apache.axis2.context.MessageContext axis2MessageContext =
+ ((Axis2MessageContext) synCtx).getAxis2MessageContext();
+
+ Object o = axis2MessageContext.getProperty("TRANSPORT_HEADERS");
+ if (o != null && o instanceof Map) {
+ Map headerMap = (Map) o;
+ Object cookie = headerMap.get("Cookie");
+
+ if (cookie != null) {
+ Object e = sessionMap.get(cookie);
+ if (e != null) {
+ endpoint = (Endpoint) e;
+ }
+ }
+ }
+
+ return endpoint;
+ }
+
+ /**
+ * Searches for "Set-Cookie" HTTP header in the message context. If found and that given
+ * session ID is not already in the session map update the session map by mapping the cookie
+ * to the endpoint.
+ *
+ * @param synCtx MessageContext possibly containing the "Set-Cookie" HTTP header.
+ * @param endpoint Endpoint to be mapped to the session.
+ */
+ public void updateSession(MessageContext synCtx, Endpoint endpoint) {
+
+ org.apache.axis2.context.MessageContext axis2MessageContext =
+ ((Axis2MessageContext) synCtx).getAxis2MessageContext();
+
+ Object o = axis2MessageContext.getProperty("TRANSPORT_HEADERS");
+ if (o != null && o instanceof Map) {
+ Map headerMap = (Map) o;
+ Object cookie = headerMap.get("Set-Cookie");
+
+ if (cookie != null) {
+ // synchronized to avoid possible replacement of sessions
+ synchronized(sessionMap) {
+ if (!sessionMap.containsKey(cookie)) {
+ sessionMap.put(cookie, endpoint);
+ }
+ }
+ }
+ }
+ }
+
+ public void unbind(MessageContext synCtx) {
+
+ org.apache.axis2.context.MessageContext axis2MessageContext =
+ ((Axis2MessageContext) synCtx).getAxis2MessageContext();
+
+ Object o = axis2MessageContext.getProperty("TRANSPORT_HEADERS");
+ if (o != null && o instanceof Map) {
+ Map headerMap = (Map) o;
+ Object cookie = headerMap.get("Cookie");
+
+ if (cookie != null) {
+ sessionMap.remove(cookie);
+ }
+ }
+ }
+
+ /**
+ * HTTP sessions are initiated by the server.
+ *
+ * @return true
+ */
+ public boolean isServerInitiatedSession() {
+ return true;
+ }
+}
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SimpleClientSessionDispatcher.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SimpleClientSessionDispatcher.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SimpleClientSessionDispatcher.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SimpleClientSessionDispatcher.java Fri Mar 30 01:15:29 2007
@@ -27,16 +27,21 @@
import javax.xml.namespace.QName;
import java.util.Map;
import java.util.HashMap;
+import java.util.Collections;
/**
* This dispatcher is implemented to demonstrate a sample client session. It will detect sessions
* based on the <syn:ClientID xmlns:syn="http://ws.apache.org/namespaces/synapse"> soap header of the
* request message. Therefore, above header has to be included in the request soap messages by the
- * client who want to initiate and maintain a session.
+ * client who wants to initiate and maintain a session.
*/
public class SimpleClientSessionDispatcher implements Dispatcher {
- private Map sessionMap = new HashMap();
+ /**
+ * Map to store session -> endpoint mappings. Synchronized map is used as this is accessed by
+ * multiple threds (e.g. multiple clients different sessions).
+ */
+ private Map sessionMap = Collections.synchronizedMap(new HashMap());
public Endpoint getEndpoint(MessageContext synCtx) {
@@ -46,9 +51,9 @@
OMElement csID = header.getFirstChildWithName(
new QName("http://ws.apache.org/namespaces/synapse", "ClientID", "syn"));
if(csID != null && csID.getText() != null) {
- if (sessionMap.containsKey(csID.getText())) {
- Endpoint endpoint = (Endpoint)sessionMap.get(csID.getText());
- return endpoint;
+ Object o = sessionMap.get(csID.getText());
+ if (o != null) {
+ return (Endpoint) o;
}
}
}
@@ -64,9 +69,25 @@
OMElement csID = header.getFirstChildWithName(
new QName("http://ws.apache.org/namespaces/synapse", "ClientID", "syn"));
if(csID != null && csID.getText() != null) {
- if (!sessionMap.containsKey(csID.getText())) {
- sessionMap.put(csID.getText(), endpoint);
+ // synchronized to avoid possible replacement of sessions
+ synchronized(sessionMap) {
+ if (!sessionMap.containsKey(csID.getText())) {
+ sessionMap.put(csID.getText(), endpoint);
+ }
}
+ }
+ }
+ }
+
+ public void unbind(MessageContext synCtx) {
+
+ SOAPHeader header = synCtx.getEnvelope().getHeader();
+
+ if(header != null) {
+ OMElement csID = header.getFirstChildWithName(
+ new QName("http://ws.apache.org/namespaces/synapse", "ClientID", "syn"));
+ if(csID != null && csID.getText() != null) {
+ sessionMap.remove(csID.getText());
}
}
}
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SoapSessionDispatcher.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SoapSessionDispatcher.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SoapSessionDispatcher.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SoapSessionDispatcher.java Fri Mar 30 01:15:29 2007
@@ -30,10 +30,15 @@
import javax.xml.namespace.QName;
import java.util.HashMap;
import java.util.Map;
+import java.util.Collections;
public class SoapSessionDispatcher implements Dispatcher {
- private Map sessionMap = new HashMap();
+ /**
+ * Map to store session -> endpoint mappings. Synchronized map is used as this is accessed by
+ * multiple threds (e.g. multiple clients different sessions).
+ */
+ private Map sessionMap = Collections.synchronizedMap(new HashMap());
/**
* Gives the endpoint based on the service group context ID of the request message.
@@ -52,9 +57,13 @@
if(header != null) {
OMElement sgcID = header.getFirstChildWithName(
new QName("http://ws.apache.org/namespaces/axis2", "ServiceGroupId", "axis2"));
+
if(sgcID != null && sgcID.getText() != null) {
- if (sessionMap.containsKey(sgcID.getText())) {
- endpoint = (Endpoint)sessionMap.get(sgcID);
+
+ Object e = sessionMap.get(sgcID.getText());
+
+ if (e != null) {
+ endpoint = (Endpoint) e;
}
}
}
@@ -90,10 +99,26 @@
OMElement sgcID = referenceParameters.getFirstChildWithName(new QName(
"http://ws.apache.org/namespaces/axis2", "ServiceGroupId", "axis2"));
- if(!sessionMap.containsKey(sgcID)) {
- sessionMap.put(sgcID.getText(), endpoint);
+ // synchronized to avoid possible replacement of sessions
+ synchronized(sessionMap) {
+ if(!sessionMap.containsKey(sgcID.getText())) {
+ sessionMap.put(sgcID.getText(), endpoint);
+ }
}
}
+ }
+ }
+ }
+
+ public void unbind(MessageContext synCtx) {
+
+ SOAPHeader header = synCtx.getEnvelope().getHeader();
+
+ if(header != null) {
+ OMElement sgcID = header.getFirstChildWithName(
+ new QName("http://ws.apache.org/namespaces/axis2", "ServiceGroupId", "axis2"));
+ if(sgcID != null && sgcID.getText() != null) {
+ sessionMap.remove(sgcID.getText());
}
}
}
Modified: webservices/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/SendMediatorSerializationTest.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/SendMediatorSerializationTest.java?view=diff&rev=523982&r1=523981&r2=523982
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/SendMediatorSerializationTest.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/config/xml/SendMediatorSerializationTest.java Fri Mar 30 01:15:29 2007
@@ -31,6 +31,7 @@
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamException;
import java.util.ArrayList;
+import java.util.List;
import java.io.StringReader;
public class SendMediatorSerializationTest extends AbstractTestCase {
@@ -77,7 +78,7 @@
send2.getEndpoint() instanceof LoadbalanceEndpoint);
LoadbalanceEndpoint endpoint = (LoadbalanceEndpoint) send2.getEndpoint();
- ArrayList addresses = endpoint.getEndpoints();
+ List addresses = endpoint.getEndpoints();
assertEquals("There should be 3 leaf level address endpoints", addresses.size(), 3);
assertTrue("Leaf level endpoints should be address endpoints",
@@ -126,7 +127,7 @@
send2.getEndpoint() instanceof FailoverEndpoint);
FailoverEndpoint endpoint = (FailoverEndpoint) send2.getEndpoint();
- ArrayList addresses = endpoint.getEndpoints();
+ List addresses = endpoint.getEndpoints();
assertEquals("There should be 3 leaf level address endpoints", addresses.size(), 3);
assertTrue("Leaf level endpoints should be address endpoints",
@@ -180,7 +181,7 @@
LoadbalanceEndpoint loadbalanceEndpoint = (LoadbalanceEndpoint) send2.getEndpoint();
- ArrayList children = loadbalanceEndpoint.getEndpoints();
+ List children = loadbalanceEndpoint.getEndpoints();
assertEquals("Top level endpoint should have 2 child endpoints.", children.size(), 2);
assertTrue("First child should be a address endpoint",
@@ -190,7 +191,7 @@
children.get(1) instanceof FailoverEndpoint);
FailoverEndpoint failoverEndpoint = (FailoverEndpoint) children.get(1);
- ArrayList children2 = failoverEndpoint.getEndpoints();
+ List children2 = failoverEndpoint.getEndpoints();
assertEquals("Fail over endpoint should have 2 children.", children2.size(), 2);
assertTrue("Children of the fail over endpoint should be address endpoints.",
---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org