You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by as...@apache.org on 2008/10/07 20:38:47 UTC
svn commit: r702579 [4/6] - in /synapse/trunk/java/modules:
core/src/main/java/org/apache/synapse/
core/src/main/java/org/apache/synapse/config/
core/src/main/java/org/apache/synapse/config/xml/
core/src/main/java/org/apache/synapse/config/xml/endpoint...
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java?rev=702579&r1=702578&r2=702579&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java Tue Oct 7 11:38:44 2008
@@ -19,22 +19,18 @@
package org.apache.synapse.endpoints;
-import org.apache.axis2.clustering.ClusterManager;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.OperationContext;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.synapse.FaultHandler;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
-import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.endpoints.algorithms.AlgorithmContext;
import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
import org.apache.synapse.endpoints.dispatch.Dispatcher;
-import org.apache.synapse.endpoints.dispatch.DispatcherContext;
+import org.apache.synapse.endpoints.dispatch.SALSessions;
+import org.apache.synapse.endpoints.dispatch.SessionInformation;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
/**
* SALoadbalanceEndpoint supports session affinity based load balancing. Each of this endpoint
@@ -55,443 +51,281 @@
* send(...) method of that endpoint. If not it will find an endpoint using the load balancing
* policy and send to that endpoint.
*/
-public class SALoadbalanceEndpoint implements Endpoint {
-
- private static final Log log = LogFactory.getLog(SALoadbalanceEndpoint.class);
-
- private static final String FIRST_MESSAGE_IN_SESSION = "first_message_in_session";
- public static final String ENDPOINT_LIST = "endpointList";
- public static final String ROOT_ENDPOINT = "rootendpoint";
- public static final String ENDPOINT_NAME_LIST = "endpointNameList";
- public static final String WARN_MESSAGE = "In a clustering environment, the endpoint " +
- "name should be specified even for anonymous endpoints. Otherwise the clustering " +
- "would not function properly, if there are more than one anonymous endpoints.";
-
- /**
- * Name of the endpoint. Used for named endpoints which can be referred using the key attribute
- * of indirect endpoints.
- */
- private String name = null;
-
- /**
- * List of endpoints among which the load is distributed. Any object implementing the Endpoint
- * interface could be used.
- */
- private List<Endpoint> endpoints = null;
-
- /**
- * Algorithm used for selecting the next endpoint to direct the first request of sessions.
- * Default is RoundRobin.
- */
- private LoadbalanceAlgorithm algorithm = null;
-
- /**
- * Parent endpoint of this endpoint if this used inside another endpoint. Although any endpoint
- * can be the parent, only SALoadbalanceEndpoint should be used here. Use of any other endpoint
- * would invalidate the session.
- */
- private Endpoint parentEndpoint = null;
-
+public class SALoadbalanceEndpoint extends LoadbalanceEndpoint {
+
/**
* Dispatcher used for session affinity.
*/
private Dispatcher dispatcher = null;
- /**
- * The dispatcher context, place holder for keeping any runtime states that are used when
- * finding endpoint for the session
- */
- private final DispatcherContext dispatcherContext = new DispatcherContext();
-
- /**
- * The endpoint context, place holder for keeping any runtime states related to the endpoint
- */
- private final EndpointContext endpointContext = new EndpointContext();
-
- /**
- * The algorithm context, place holder for keeping any runtime states related to the load
- * balance algorithm
- */
- private final AlgorithmContext algorithmContext = new AlgorithmContext();
-
-
- public void send(MessageContext synMessageContext) {
-
- if (log.isDebugEnabled()) {
- log.debug("Start : Session Affinity Load-balance Endpoint " + name);
- }
- boolean isClusteringEnable = false;
- // get Axis2 MessageContext and ConfigurationContext
- org.apache.axis2.context.MessageContext axisMC =
- ((Axis2MessageContext) synMessageContext).getAxis2MessageContext();
- ConfigurationContext cc = axisMC.getConfigurationContext();
-
- //The check for clustering environment
- ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager();
- if (clusterManager != null &&
- clusterManager.getContextManager() != null) {
- isClusteringEnable = true;
- }
-
- String endpointName = this.getName();
- if (endpointName == null) {
- if (isClusteringEnable) {
- log.warn(WARN_MESSAGE);
- }
- if (log.isDebugEnabled()) {
- log.debug("Using the name for the anonymous endpoint as : '"
- + SynapseConstants.ANONYMOUS_ENDPOINT + "'");
- }
- endpointName = SynapseConstants.ANONYMOUS_ENDPOINT;
- }
+ /* Sessions time out interval*/
+ private long sessionTimeout = -1;
- if (isClusteringEnable) {
+ public void init(ConfigurationContext cc) {
- // if this is a cluster environment, then set configuration context to endpoint context
- if (endpointContext.getConfigurationContext() == null) {
+ if (!initialized) {
- if (log.isDebugEnabled()) {
- log.debug("Setting the ConfigurationContext to " +
- "the EndpointContext with the name " + endpointName +
- " for replicating data on the cluster");
- }
- endpointContext.setConfigurationContext(cc);
- endpointContext.setContextID(endpointName);
+ super.init(cc);
+ // Initialize the SAL Sessions if already has not been initialized.
+ SALSessions salSessions = SALSessions.getInstance();
+ if (!salSessions.isInitialized()) {
+ salSessions.initialize(isClusteringEnabled, cc);
}
- // if this is a cluster environment, then set configuration context to load balance
- // algorithm context
- if (algorithmContext.getConfigurationContext() == null) {
-
- if (log.isDebugEnabled()) {
- log.debug("Setting the ConfigurationContext to " +
- "the AlgorithmContext with the name " + endpointName +
- " for replicating data on the cluster");
- }
- algorithmContext.setConfigurationContext(cc);
- algorithmContext.setContextID(endpointName);
+ //For each root level SAL endpoints , all children are registered
+ // This is for cluttering as in clustering only endpoint names are replicated
+ // and it needs way to pick endpoints by name
+ if (isClusteringEnabled && (this.getParentEndpoint() == null ||
+ !(this.getParentEndpoint() instanceof SALoadbalanceEndpoint))) {
+ SALSessions.getInstance().registerChildren(this, getChildren());
}
- // if this is a cluster environment, then set configuration context to session based
- // endpoint dispatcher
- if (dispatcherContext.getConfigurationContext() == null) {
+ }
+ }
- if (log.isDebugEnabled()) {
- log.debug("Setting the ConfigurationContext to " +
- "the DispatcherContext with the name " + endpointName +
- " for replicating data on the cluster");
- }
- dispatcherContext.setConfigurationContext(cc);
- dispatcherContext.setContextID(endpointName);
+ public void send(MessageContext synCtx) {
- if (log.isDebugEnabled()) {
- log.debug("Setting the endpoints to the DispatcherContext : " + endpoints);
- }
- dispatcherContext.setEndpoints(endpoints);
- }
+ if (log.isDebugEnabled()) {
+ log.debug("Start : Session Affinity Load-balance Endpoint " + getName());
}
-
// first check if this session is associated with a session. if so, get the endpoint
// associated for that session.
- Endpoint endpoint = dispatcher.getEndpoint(synMessageContext, dispatcherContext);
- if (endpoint == null) {
-
- // there is no endpoint associated with this session. get a new endpoint using the
- // load balance policy.
- endpoint = algorithm.getNextEndpoint(synMessageContext, algorithmContext);
- // this is a start of a new session. so update session map.
- if (dispatcher.isServerInitiatedSession()) {
+ SessionInformation sessionInformation =
+ (SessionInformation) synCtx.getProperty(
+ SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION);
- if (log.isDebugEnabled()) {
- log.debug("Adding a new server initiated session for the current message");
- }
+ List<Endpoint> endpoints = (List<Endpoint>) synCtx.getProperty(
+ SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_ENDPOINT_LIST);
- // add this endpoint to the endpoint sequence of operation context.
- Axis2MessageContext axis2MsgCtx = (Axis2MessageContext) synMessageContext;
- OperationContext opCtx = axis2MsgCtx.getAxis2MessageContext().getOperationContext();
-
- if (isClusteringEnable) {
- // If running on a cluster keep endpoint names, because it is heavy to
- // replicate endpoint itself
-
- Object o = opCtx.getPropertyNonReplicable(ENDPOINT_NAME_LIST);
- List<String> epNameList;
- if (o instanceof List) {
- epNameList = (List<String>) o;
- epNameList.add(endpointName);
- } else {
- // this is the first endpoint in the heirachy. so create the queue and
- // insert this as the first element.
- epNameList = new ArrayList<String>();
- epNameList.add(endpointName);
- opCtx.setNonReplicableProperty(ROOT_ENDPOINT, this);
- }
-
- // if the next endpoint is not a session affinity one, endpoint sequence ends
- // here. but we have to add the next endpoint to the list.
- if (!(endpoint instanceof SALoadbalanceEndpoint)) {
-
- String name;
- if (endpoint instanceof IndirectEndpoint) {
- name = ((IndirectEndpoint) endpoint).getKey();
- } else {
- name = endpoint.getName();
- }
+ if (sessionInformation == null && endpoints == null) {
- if (name == null) {
- log.warn(WARN_MESSAGE);
- name = SynapseConstants.ANONYMOUS_ENDPOINT;
- }
- epNameList.add(name);
- }
+ sessionInformation = dispatcher.getSession(synCtx);
+ if (sessionInformation != null) {
- if (log.isDebugEnabled()) {
- log.debug("Operating on a cluster. Setting the endpoint name list to " +
- "the OperationContext : " + epNameList);
- }
- opCtx.setProperty(ENDPOINT_NAME_LIST, epNameList);
-
- } else {
-
- Object o = opCtx.getProperty(ENDPOINT_LIST);
- List<Endpoint> endpointList;
- if (o instanceof List) {
- endpointList = (List<Endpoint>) o;
- endpointList.add(this);
- } else {
- // this is the first endpoint in the heirachy. so create the queue and
- // insert this as the first element.
- endpointList = new ArrayList<Endpoint>();
- endpointList.add(this);
- opCtx.setProperty(ENDPOINT_LIST, endpointList);
- }
-
- // if the next endpoint is not a session affinity one, endpoint sequence ends
- // here. but we have to add the next endpoint to the list.
- if (!(endpoint instanceof SALoadbalanceEndpoint)) {
- endpointList.add(endpoint);
- }
+ if (log.isDebugEnabled()) {
+ log.debug("Current session id : " + sessionInformation.getId());
}
-
- } else {
- dispatcher.updateSession(synMessageContext, dispatcherContext, endpoint);
- }
-
- // this is the first request. so an endpoint has not been bound to this session and we
- // are free to failover if the currently selected endpoint is not working. but for
- // failover to work, we have to build the soap envelope.
- synMessageContext.getEnvelope().build();
-
- // we should also indicate that this is the first message in the session. so that
- // onFault(...) method can resend only the failed attempts for the first message.
- synMessageContext.setProperty(FIRST_MESSAGE_IN_SESSION, Boolean.TRUE);
- }
-
- if (endpoint != null) {
-
- // endpoints given by session dispatchers may not be active. therefore, we have check
- // it here.
- if (endpoint.isActive(synMessageContext)) {
+ endpoints =
+ dispatcher.getEndpoints(sessionInformation);
if (log.isDebugEnabled()) {
- log.debug("Using the endpoint on the session with "
- + ((endpoint instanceof IndirectEndpoint) ? "key : "
- + ((IndirectEndpoint) endpoint).getKey() : "name : "
- + endpoint.getName()) + " for sending the message");
+ log.debug("Endpoint sequence (path) on current session : " + this + endpoints);
}
- endpoint.send(synMessageContext);
- } else {
- informFailure(synMessageContext);
+
+ synCtx.setProperty(
+ SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_ENDPOINT_LIST, endpoints);
+ // This is for reliably recovery any session information if while response is getting ,
+ // session information has been removed by cleaner.
+ // This will not be a cost as session information a not heavy data structure
+ synCtx.setProperty(
+ SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION, sessionInformation);
}
+ }
+ if (sessionInformation != null && endpoints != null) {
+ //send message on current session
+ sendMessageOnCurrentSession(sessionInformation.getId(), endpoints, synCtx);
} else {
-
- // all child endpoints have failed. so mark this also as failed.
- if (log.isDebugEnabled()) {
- log.debug("Marking the Endpoint as failed, " +
- "because all child endpoints has been failed");
- }
- setActive(false, synMessageContext);
- informFailure(synMessageContext);
+ // prepare for a new session
+ sendMessageOnNewSession(synCtx);
}
+ }
+
+ public Dispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public void setDispatcher(Dispatcher dispatcher) {
+ this.dispatcher = dispatcher;
}
/**
- * This will be called for the response of the first message of each server initiated session.
+ * It is logically incorrect to failover a session affinity endpoint after the session has started.
+ * If we redirect a message belonging to a particular session, new endpoint is not aware of the
+ * session. So we can't handle anything more at the endpoint level. Therefore, this method just
+ * deactivate the failed endpoint and give the fault to the next fault handler.
+ * <p/>
+ * But if the session has not started (i.e. first message), the message will be resend by binding
+ * it to a different endpoint.
*
- * @param responseMsgCtx
- * @param endpointList
- * @param isClusteringEnable
+ * @param endpoint Failed endpoint.
+ * @param synCtx MessageContext of the failed message.
*/
- public void updateSession(MessageContext responseMsgCtx, List endpointList,
- boolean isClusteringEnable) {
+ public void onChildEndpointFail(Endpoint endpoint, MessageContext synCtx) {
- Endpoint endpoint = null;
+ Object o = synCtx.getProperty(
+ SynapseConstants.PROP_SAL_ENDPOINT_FIRST_MESSAGE_IN_SESSION);
- if (isClusteringEnable) {
- // if this is a clustering env. only keep endpoint names, because, it is heavy to
- // replicate endpoint itself
- String epNameObj = (String) endpointList.remove(0);
- for (Endpoint ep : endpoints) {
- if (ep != null) {
-
- String name;
- if (ep instanceof IndirectEndpoint) {
- name = ((IndirectEndpoint) ep).getKey();
- } else {
- name = ep.getName();
- }
+ if (o != null && Boolean.TRUE.equals(o)) {
- if (name != null && name.equals(epNameObj)) {
- endpoint = ep;
- break;
+ // this is the first message. so unbind the session with failed endpoint and start
+ // new one by resending.
+
+ dispatcher.unbind(synCtx);
+
+ // As going to be happened retry , we have to remove states related to the previous try
+
+ Object epListObj = synCtx.getProperty(SynapseConstants.PROP_SAL_ENDPOINT_ENDPOINT_LIST);
+ if (epListObj instanceof List) {
+ List<Endpoint> endpointList = (List<Endpoint>) epListObj;
+ if (!endpointList.isEmpty()) {
+ if (endpointList.get(0) == this) {
+ endpointList.clear();
+ } else {
+ if (endpointList.contains(this)) {
+ int lastIndex = endpointList.indexOf(this);
+ List<Endpoint> head =
+ endpointList.subList(lastIndex , endpointList.size());
+ head.clear();
+ }
}
}
}
+
+ send(synCtx);
} else {
- endpoint = (Endpoint) endpointList.remove(0);
- }
-
- if (endpoint != null) {
-
- dispatcher.updateSession(responseMsgCtx, dispatcherContext, endpoint);
- if (endpoint instanceof SALoadbalanceEndpoint) {
- ((SALoadbalanceEndpoint) endpoint).updateSession(
- responseMsgCtx, endpointList, isClusteringEnable);
- }
+ // session has already started. we can't failover.
+ informFailure(synCtx, SynapseConstants.ENDPOINT_SAL_FAILED_SESSION,
+ "Failure an endpoint " + endpoint + " in the current session");
}
}
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name.trim();
- }
-
- public LoadbalanceAlgorithm getAlgorithm() {
- return algorithm;
- }
-
- public void setAlgorithm(LoadbalanceAlgorithm algorithm) {
- this.algorithm = algorithm;
- }
-
- /**
- * This is active in below conditions:
- * If a session is not started AND at least one child endpoint is active.
- * If a session is started AND the binding endpoint is active.
- * <p/>
- * This is not active for all other conditions.
- *
- * @param synMessageContext MessageContext of the current message. This is used to determine the
- * session.
- * @return true is active. false otherwise.
+ /*
+ * Helper method that send message on the endpoint sequence on the current session
*/
- public boolean isActive(MessageContext synMessageContext) {
- // todo: implement above
- boolean active;
- Endpoint endpoint = dispatcher.getEndpoint(synMessageContext, dispatcherContext);
- if (endpoint == null) { // If a session is not started
- active = endpointContext.isActive();
- if (!active && endpoints != null) {
- for (Endpoint ep : endpoints) {
- if (ep != null) {
- active = ep.isActive(synMessageContext);
- if (active) { //AND at least one child endpoint is active
- endpointContext.setActive(active);
- // don't break the loop though we found one active endpoint. calling isActive()
- // on all child endpoints will update their active state. so this is a good
- // time to do that.
+ private void sendMessageOnCurrentSession(String sessionID, List<Endpoint> endpoints, MessageContext synCtx) {
+
+ // get the next endpoint in the endpoint sequence
+ Endpoint endpoint = null;
+
+ boolean invalidSequence = false;
+ if (endpoints.isEmpty()) {
+ invalidSequence = true;
+ } else {
+ if (endpoints.contains(this)) {
+ // This situation will come only if this endpoint is referred as an indirect endpoint.
+ // All the path before this SAL endpoint are ignored.
+ int length = endpoints.size();
+ if (length > 1) {
+
+ int beginIndex = endpoints.lastIndexOf(this) + 1;
+ if (beginIndex == length) {
+ invalidSequence = true;
+ } else {
+ endpoints = endpoints.subList(beginIndex, length);
+ if (!endpoints.isEmpty()) {
+ endpoint = endpoints.remove(0);
+ } else {
+ invalidSequence = true;
}
}
+ } else {
+ invalidSequence = true;
}
- }
- } else {
- //If a session is started AND the binding endpoint is active.
- active = endpoint.isActive(synMessageContext);
- if (active) {
- endpointContext.setActive(active);
+
+ } else {
+ endpoint = endpoints.remove(0);
}
}
- if (log.isDebugEnabled()) {
- log.debug("SALoadbalanceEndpoint with name '" + getName() + "' is in "
- + (active ? "active" : "inactive") + " state");
+ if (invalidSequence) {
+ informFailure(synCtx, SynapseConstants.ENDPOINT_SAL_INVALID_PATH,
+ "Invalid endpoint sequence " + endpoints + " for session with id " + sessionID);
+ return;
+ }
+ // endpoints given by session dispatchers may not be active. therefore, we have check
+ // it here.
+ if (endpoint != null && endpoint.readyToSend()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Using the endpoint " + endpoint + " for sending the message");
+ }
+ synCtx.pushFaultHandler(this);
+ endpoint.send(synCtx);
+ } else {
+ informFailure(synCtx, SynapseConstants.ENDPOINT_SAL_NOT_READY,
+ "The endpoint " + endpoint + " on the session with id " +
+ sessionID + " is not ready.");
}
-
- return active;
}
- public void setActive(boolean active, MessageContext synMessageContext) {
- endpointContext.setActive(active);
- }
+ /*
+ * Helper method that send message hoping to establish new session
+ */
+ private void sendMessageOnNewSession(MessageContext synCtx) {
- public List<Endpoint> getEndpoints() {
- return endpoints;
- }
+ // there is no endpoint associated with this session. get a new endpoint using the
+ // load balance policy.
+ Endpoint endpoint = getNextChild(synCtx);
+ if (endpoint == null) {
- public void setEndpoints(List<Endpoint> endpoints) {
- this.endpoints = endpoints;
- }
+ informFailure(synCtx, SynapseConstants.ENDPOINT_LB_NONE_READY,
+ "SLALoadbalance endpoint : " + getName() + " - no ready child endpoints");
+ } else {
- public void setParentEndpoint(Endpoint parentEndpoint) {
- this.parentEndpoint = parentEndpoint;
- }
+ prepareEndPointSequence(synCtx, endpoint);
- public Dispatcher getDispatcher() {
- return dispatcher;
- }
+ // this is the first request. so an endpoint has not been bound to this session and we
+ // are free to failover if the currently selected endpoint is not working. but for
+ // failover to work, we have to build the soap envelope.
+ synCtx.getEnvelope().build();
- public void setDispatcher(Dispatcher dispatcher) {
- this.dispatcher = dispatcher;
+ // we should also indicate that this is the first message in the session. so that
+ // onFault(...) method can resend only the failed attempts for the first message.
+ synCtx.setProperty(SynapseConstants.PROP_SAL_ENDPOINT_FIRST_MESSAGE_IN_SESSION, Boolean.TRUE);
+ synCtx.pushFaultHandler(this);
+ endpoint.send(synCtx);
+ }
}
- /**
- * It is logically incorrect to failover a session affinity endpoint after the session has started.
- * If we redirect a message belonging to a particular session, new endpoint is not aware of the
- * session. So we can't handle anything more at the endpoint level. Therefore, this method just
- * deactivate the failed endpoint and give the fault to the next fault handler.
- * <p/>
- * But if the session has not started (i.e. first message), the message will be resend by binding
- * it to a different endpoint.
- *
- * @param endpoint Failed endpoint.
- * @param synMessageContext MessageContext of the failed message.
- */
- public void onChildEndpointFail(Endpoint endpoint, MessageContext synMessageContext) {
+ private void informFailure(MessageContext synCtx, int errorCode, String errorMsg) {
- Object o = synMessageContext.getProperty(FIRST_MESSAGE_IN_SESSION);
+ if (synCtx.getProperty(SynapseConstants.LAST_ENDPOINT) == null) {
+ synCtx.setProperty(SynapseConstants.ERROR_CODE, errorCode);
+ synCtx.setProperty(SynapseConstants.ERROR_MESSAGE, errorMsg);
+ synCtx.setProperty(SynapseConstants.ERROR_DETAIL, errorMsg);
+ synCtx.setProperty(SynapseConstants.ERROR_EXCEPTION, null);
+ }
+ super.onFault(synCtx);
+ }
- if (o != null && Boolean.TRUE.equals(o)) {
+ /*
+ * Preparing the endpoint sequence for a new session establishment request
+ */
+ private void prepareEndPointSequence(MessageContext synCtx, Endpoint endpoint) {
- // this is the first message. so unbind the sesion with failed endpoint and start
- // new one by resending.
- dispatcher.unbind(synMessageContext, dispatcherContext);
- send(synMessageContext);
+ Object o = synCtx.getProperty(SynapseConstants.PROP_SAL_ENDPOINT_ENDPOINT_LIST);
+ List<Endpoint> endpointList;
+ if (o instanceof List) {
+ endpointList = (List<Endpoint>) o;
+ endpointList.add(this);
} else {
-
- // session has already started. we can't failover.
- informFailure(synMessageContext);
+ // this is the first endpoint in the hierarchy. so create the queue and
+ // insert this as the first element.
+ endpointList = new ArrayList<Endpoint>();
+ endpointList.add(this);
+ synCtx.setProperty(SynapseConstants.PROP_SAL_ENDPOINT_ENDPOINT_LIST, endpointList);
+ synCtx.setProperty(SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_DISPATCHER, dispatcher);
}
- }
-
- private void informFailure(MessageContext synMessageContext) {
- log.warn("Failed to send using the selected endpoint, becasue it is inactive");
-
- if (parentEndpoint != null) {
- parentEndpoint.onChildEndpointFail(this, synMessageContext);
- } else {
- Object o = synMessageContext.getFaultStack().pop();
- if (o != null) {
- ((FaultHandler) o).handleFault(synMessageContext);
+ // if the next endpoint is not a session affinity one, endpoint sequence ends
+ // here. but we have to add the next endpoint to the list.
+ if (!(endpoint instanceof SALoadbalanceEndpoint)) {
+ endpointList.add(endpoint);
+ // Clearing out if there any any session information with current message
+ if (dispatcher.isServerInitiatedSession()) {
+ dispatcher.removeSessionID(synCtx);
}
}
}
+ public long getSessionTimeout() {
+ return sessionTimeout;
+ }
+
+ public void setSessionTimeout(long sessionTimeout) {
+ this.sessionTimeout = sessionTimeout;
+ }
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java?rev=702579&r1=702578&r2=702579&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java Tue Oct 7 11:38:44 2008
@@ -22,154 +22,38 @@
import org.apache.axiom.om.OMElement;
import org.apache.axis2.clustering.ClusterManager;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.synapse.FaultHandler;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.core.axis2.Axis2MessageContext;
-import org.apache.synapse.endpoints.utils.EndpointDefinition;
-
-import java.util.Stack;
/**
- * WSDLEndpoint represents the endpoints built using a wsdl document. It stores the details about
- * the endpoint in a EndpointDefinition object. Once the WSDLEndpoint object is constructed, it
- * should not access the wsdl document at runtime to obtain endpoint information. If it is necessary
- * to create an endpoint using a dynamic wsdl, store the endpoint configuration in the registry and
- * create a dynamic wsdl endpoint using that registry key.
+ * WSDLEndpoint represents the endpoints built using a WSDL document. It stores the details about
+ * the endpoint in an EndpointDefinition object. Once the WSDLEndpoint object is constructed, it
+ * should not access the WSDL document at runtime to obtain endpoint information. If it is necessary
+ * to create an endpoint using a dynamic WSDL, store the endpoint configuration in the registry and
+ * create a dynamic WSDL endpoint using that registry key.
* <p/>
* TODO: This should allow various policies to be applied on fine grained level (e.g. operations).
*/
-public class WSDLEndpoint extends FaultHandler implements Endpoint {
-
- private static final Log log = LogFactory.getLog(WSDLEndpoint.class);
- private static final Log trace = LogFactory.getLog(SynapseConstants.TRACE_LOGGER);
+public class WSDLEndpoint extends AbstractEndpoint {
- private String name = null;
private String wsdlURI;
private OMElement wsdlDoc;
private String serviceName;
private String portName;
- private Endpoint parentEndpoint = null;
- private EndpointDefinition endpoint = null;
-
- /**
- * The endpoint context , place holder for keep any runtime states related to the endpoint
- */
- private final EndpointContext endpointContext = new EndpointContext();
-
- /**
- * Sends the message through this endpoint. This method just handles statistics related
- * functions and gives the message to the Synapse environment to send. It does not add any
- * endpoint specific details to the message context. These details are added only to the cloned
- * message context by the Axis2FlexibleMepClient. So that we can reuse the original message
- * context for resending through different endpoints.
- *
- * @param synCtx MessageContext sent by client to Synapse
- */
- public void send(MessageContext synCtx) {
-
- boolean traceOn = isTraceOn(synCtx);
- boolean traceOrDebugOn = isTraceOrDebugOn(traceOn);
-
- if (traceOrDebugOn) {
- traceOrDebug(traceOn, "Start : WSDL Endpoint");
-
- if (traceOn && trace.isTraceEnabled()) {
- trace.trace("Message : " + synCtx.getEnvelope());
- }
- }
-
- if (endpoint.getAddress() != null) {
-
- String eprAddress = endpoint.getAddress();
- boolean isClusteringEnable = false;
- // get Axis2 MessageContext and ConfigurationContext
- org.apache.axis2.context.MessageContext axisMC =
- ((Axis2MessageContext) synCtx).getAxis2MessageContext();
- ConfigurationContext cc = axisMC.getConfigurationContext();
-
- //The check for clustering environment
- ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager();
- if (clusterManager != null &&
- clusterManager.getContextManager() != null) {
- isClusteringEnable = true;
- }
-
- String endPointName = this.getName();
- if (endPointName == null) {
-
- if (traceOrDebugOn && isClusteringEnable) {
- log.warn("In a clustering environment , the endpoint name should be " +
- "specified even for anonymous endpoints. Otherwise, the clustering " +
- "would not be functioned correctly if there are more than one " +
- "anonymous endpoints. ");
- }
- endPointName = SynapseConstants.ANONYMOUS_ENDPOINT;
- }
-
- if (isClusteringEnable) {
- // if this is a cluster environment , then set configuration context
- // to endpoint context
- if (endpointContext.getConfigurationContext() == null) {
- endpointContext.setConfigurationContext(cc);
- endpointContext.setContextID(endPointName);
- }
- }
-
- // Setting Required property to collect the End Point audit
-
- if (traceOrDebugOn) {
- traceOrDebug(traceOn, "Sending message to WSDL endpoint : " +
- endPointName + " resolves to address = " + eprAddress);
- traceOrDebug(traceOn, "SOAPAction: " + (synCtx.getSoapAction() != null ?
- synCtx.getSoapAction() : "null"));
- traceOrDebug(traceOn, "WSA-Action: " + (synCtx.getWSAAction() != null ?
- synCtx.getWSAAction() : "null"));
-
- if (traceOn && trace.isTraceEnabled()) {
- trace.trace("Envelope : \n" + synCtx.getEnvelope());
- }
- }
-
- // register this as the immediate fault handler for this message.
- synCtx.pushFaultHandler(this);
-
- // add this as the last endpoint to process this message. it is used by audit code.
- synCtx.setProperty(SynapseConstants.PROCESSED_ENDPOINT, this);
-
- synCtx.getEnvironment().send(endpoint, synCtx);
- }
- }
-
public void onFault(MessageContext synCtx) {
- // perform retries here
-
- // if this endpoint has actually failed, inform the parent.
- setActive(false, synCtx);
-
- if (parentEndpoint != null) {
- parentEndpoint.onChildEndpointFail(this, synCtx);
- } else {
- Stack faultStack = synCtx.getFaultStack();
- if (!faultStack.isEmpty()) {
- ((FaultHandler) faultStack.pop()).handleFault(synCtx);
- }
+ // is this really a fault or a timeout/connection close etc?
+ if (isTimeout(synCtx)) {
+ getContext().onTimeout();
+ } else if (isSuspendFault(synCtx)) {
+ getContext().onFault();
}
+ super.onFault(synCtx);
}
- public void onChildEndpointFail(Endpoint endpoint, MessageContext synMessageContext) {
- // WSDLEndpoint does not contain any child endpoints. So this method will never be called.
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name.trim();
+ public void onSuccess() {
+ getContext().onSuccess();
}
public String getWsdlURI() {
@@ -203,103 +87,4 @@
public void setPortName(String portName) {
this.portName = portName;
}
-
- /**
- * Checks if the endpoint is active (failed or not). If endpoint is in failed state and
- * suspendOnFailDuration has elapsed, it will be set to active.
- *
- * @param synMessageContext MessageContext of the current message. This is not used here.
- * @return true if endpoint is active. false otherwise.
- */
- public boolean isActive(MessageContext synMessageContext) {
- boolean active = endpointContext.isActive();
- if (!active) {
- long recoverOn = endpointContext.getRecoverOn();
- if (System.currentTimeMillis() > recoverOn) {
- active = true;
- endpointContext.setActive(true);
- }
- }
-
- if (log.isDebugEnabled()) {
- log.debug("WSDLEndpoint with name '" + name + "' is in "
- + (active ? "active" : "inactive") + " state");
- }
-
- return active;
- }
-
- /**
- * Sets if endpoint active or not. if endpoint is set as failed (active = false), the recover on
- * time is calculated so that it will be activated after the recover on time.
- *
- * @param active true if active. false otherwise.
- * @param synMessageContext MessageContext of the current message. This is not used here.
- */
- public void setActive(boolean active, MessageContext synMessageContext) {
-
- if (!active) {
- if (endpoint.getSuspendOnFailDuration() != -1) {
- // Calculating a new value by adding suspendOnFailDuration to current time.
- // as the endpoint is set as failed
- endpointContext.setRecoverOn(
- System.currentTimeMillis() + endpoint.getSuspendOnFailDuration());
- } else {
- endpointContext.setRecoverOn(Long.MAX_VALUE);
- }
- }
-
- endpointContext.setActive(true);
- }
-
- public void setParentEndpoint(Endpoint parentEndpoint) {
- this.parentEndpoint = parentEndpoint;
- }
-
- public EndpointDefinition getEndpoint() {
- return endpoint;
- }
-
- public void setEndpoint(EndpointDefinition endpoint) {
- this.endpoint = endpoint;
- }
-
- /**
- * Should this mediator perform tracing? True if its explicitly asked to
- * trace, or its parent has been asked to trace and it does not reject it
- *
- * @param msgCtx the current message
- * @return true if tracing should be performed
- */
- protected boolean isTraceOn(MessageContext msgCtx) {
- return
- (endpoint.getTraceState() == SynapseConstants.TRACING_ON) ||
- (endpoint.getTraceState() == SynapseConstants.TRACING_UNSET &&
- msgCtx.getTracingState() == SynapseConstants.TRACING_ON);
- }
-
- /**
- * Is tracing or debug logging on?
- *
- * @param isTraceOn is tracing known to be on?
- * @return true, if either tracing or debug logging is on
- */
- protected boolean isTraceOrDebugOn(boolean isTraceOn) {
- return isTraceOn || log.isDebugEnabled();
- }
-
- /**
- * Perform Trace and Debug logging of a message @INFO (trace) and DEBUG (log)
- *
- * @param traceOn is runtime trace on for this message?
- * @param msg the message to log/trace
- */
- protected void traceOrDebug(boolean traceOn, String msg) {
- if (traceOn) {
- trace.info(msg);
- }
- if (log.isDebugEnabled()) {
- log.debug(msg);
- }
- }
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java?rev=702579&r1=702578&r2=702579&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java Tue Oct 7 11:38:44 2008
@@ -18,73 +18,57 @@
*/
package org.apache.synapse.endpoints.algorithms;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.context.Replicator;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseException;
-import org.apache.synapse.util.Replicator;
/**
- * Keeps the states of the load balance algorithm.This hides where those states are kept.For a
- * cluster environment ,all states are kept in the axis2 configuration context in order to replicate
- * those states so that other synapse instance in the same cluster can see those changes .
- * This class can be evolved to keep any run time states related to the endpoint .
- * For a non-clustered environment , all data are kept locally.
- * <p/>
- * This class provide the abstraction need to separate the dynamic data from the static data
- * and improve the high cohesion and provides capability to replicate only required state at
- * a given time. This improves the performance when replicate data.
+ * Keeps the runtime state of the algorithm
*/
public class AlgorithmContext {
private static final Log log = LogFactory.getLog(AlgorithmContext.class);
- /* The static constant only for construct key prefix for each property in a dispatcher context
- * as it is need when those property state going to replicate in a cluster env. */
- private static final String UNDERSCORE_STRING = "_";
- private static final String CURRENT_EPR = "currentEPR";
-
- /* The axis configuration context- this will hold the all callers states
- * when doing throttling in a clustered environment. */
- private ConfigurationContext configCtx;
-
- /* Is this env. support clustering*/
- private boolean isClusteringEnable = false;
-
- /* The key for 'currentEPR' attribute and this is used when this attribute value being
- * replicated */
- private String currentEPRPropertyKey;
+ private static final String KEY_PREFIX = "synapse.endpoint.lb.algorithm.";
+ private static final String CURRENT_EPR = ".current_epr";
+
+ /* The axis2 configuration context - this hold state in a clustered environment. */
+ private ConfigurationContext cfgCtx;
+
+ /* Are we supporting clustering ? */
+ private Boolean isClusteringEnabled = null;
+
+ /* The key for 'currentEPR' attribute when replicated in a clsuter */
+ private String CURRENT_EPR_PROP_KEY;
/* The pointer to current epr - The position of the current EPR */
private int currentEPR = 0;
+ public AlgorithmContext(boolean clusteringEnabled, ConfigurationContext cfgCtx, String endpointName) {
+ this.cfgCtx = cfgCtx;
+ if (clusteringEnabled) {
+ isClusteringEnabled = Boolean.TRUE;
+ }
+ CURRENT_EPR_PROP_KEY = KEY_PREFIX + endpointName + CURRENT_EPR;
+ }
+
/**
- * To get the position of the current EPR
- * If there is no value and if there will not appear any errors , then '0' will be returned.
+ * To get the position of the current EPR for use. Default to 0 - i.e. first endpoint
*
* @return The position of the current EPR
*/
public int getCurrentEndpointIndex() {
- if (this.isClusteringEnable) { // if this is a clustering env.
+ if (Boolean.TRUE.equals(isClusteringEnabled)) {
- if (this.currentEPRPropertyKey == null || "".equals(this.currentEPRPropertyKey)) {
- handleException("Cannot find the required key to find the " +
- "shared state of the 'currentEPR' attribute");
- }
-
- Object value = this.configCtx.getPropertyNonReplicable(this.currentEPRPropertyKey);
+ Object value = cfgCtx.getPropertyNonReplicable(this.CURRENT_EPR_PROP_KEY);
if (value == null) {
return 0;
- }
- try {
- if (value instanceof Integer) {
- return (Integer) value;
- } else if (value instanceof String) {
- return Integer.parseInt((String) value);
- }
- } catch (NumberFormatException e) {
- handleException("The invalid value for the 'currentEPR' attribute");
+ } else if (value instanceof Integer) {
+ return ((Integer) value);
}
} else {
return currentEPR;
@@ -97,21 +81,18 @@
*
* @param currentEPR The current position
*/
- public void setCurrentEndpointIndex(int currentEPR) {
+ public void setCurrentEPR(int currentEPR) {
- if (isClusteringEnable) { // if this is a clustering env.
+ if (Boolean.TRUE.equals(isClusteringEnabled)) {
- if (currentEPRPropertyKey != null) {
- if (log.isDebugEnabled()) {
- log.debug("Setting the current EPR " + currentEPR
- + " with the key " + currentEPRPropertyKey);
- }
- // Sets the property and replicates the current state so that all instances
- Replicator.setAndReplicateState(currentEPRPropertyKey, currentEPR, configCtx);
+ if (log.isDebugEnabled()) {
+ log.debug("Set EPR with key : " + CURRENT_EPR_PROP_KEY + " as : " + currentEPR);
}
+ setAndReplicateState(CURRENT_EPR_PROP_KEY, currentEPR);
+
} else {
if (log.isDebugEnabled()) {
- log.debug("Setting the current EPR " + currentEPR);
+ log.debug("Setting the current EPR as : " + currentEPR);
}
this.currentEPR = currentEPR;
}
@@ -123,55 +104,54 @@
* @return Returns the ConfigurationContext instance
*/
public ConfigurationContext getConfigurationContext() {
- return configCtx;
+ return cfgCtx;
}
/**
- * Sets the ConfigurationContext instance . This is only used for cluster env.
- * By setting this , indicates that this is a cluster env.
+ * Helper methods for handle errors.
*
- * @param configCtx The ConfigurationContext instance
+ * @param msg The error message
*/
- public void setConfigurationContext(ConfigurationContext configCtx) {
-
- if (configCtx == null) {
- handleException("The ConfigurationContext cannot be null when system " +
- "in a cluster environment");
- }
-
- this.configCtx = configCtx;
- this.isClusteringEnable = true; // Now, the environment is considered as a cluster
+ protected void handleException(String msg) {
+ log.error(msg);
+ throw new SynapseException(msg);
}
/**
- * Sets the identifier for this algorithm context , so that , this can be identified
- * uniquely across the cluster. The id will be the name of the endpoint
+ * Helper methods for handle errors.
*
- * @param contextID The Id for this algorithm context
+ * @param msg The error message
+ * @param e The exception
*/
- public void setContextID(String contextID) {
-
- if (contextID == null || "".equals(contextID)) {
- handleException("The Context ID cannot be null when system in a cluster environment");
- }
-
- //Making required key for each property in the algorithm context- Those will be used when
- //replicating states
- StringBuffer buffer = new StringBuffer();
- buffer.append(contextID);
- buffer.append(UNDERSCORE_STRING);
- buffer.append(CURRENT_EPR);
- currentEPRPropertyKey = buffer.toString();
+ protected void handleException(String msg, Exception e) {
+ log.error(msg, e);
+ throw new SynapseException(msg, e);
}
-
/**
- * Helper methods for handle errors.
+ * Helper method to replicates states of the property with given key
+ * Sets property and replicates the current state so that all instances
+ * across cluster can see this state
*
- * @param msg The error message
+ * @param key The key of the property
+ * @param value The value of the property
*/
- private void handleException(String msg) {
- log.error(msg);
- throw new SynapseException(msg);
+ private void setAndReplicateState(String key, Object value) {
+
+ if (cfgCtx != null && key != null && value != null) {
+
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Replicating property key : " + key + " as : " + value);
+ }
+ cfgCtx.setProperty(key, value);
+ Replicator.replicate(cfgCtx, new String[]{key});
+
+ } catch (ClusteringFault clusteringFault) {
+ handleException("Error replicating property : " + key + " as : " +
+ value, clusteringFault);
+ }
+ }
}
+
}
\ No newline at end of file
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java?rev=702579&r1=702578&r2=702579&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java Tue Oct 7 11:38:44 2008
@@ -36,14 +36,15 @@
*
* @param members The application members
*/
- void setApplicationMembers(List<Member> members);
-
- /**
- * Set the endpoints
- *
- * @param endpoints The endpoints
- */
- void setEndpoints(List<Endpoint> endpoints);
+// TODO FIX-RUWAN
+// void setApplicationMembers(List<Member> members);
+//
+// /**
+// * Set the endpoints
+// *
+// * @param endpoints The endpoints
+// */
+// void setEndpoints(List<Endpoint> endpoints);
/**
* This method returns the next node according to the algorithm implementation.
@@ -63,7 +64,8 @@
* @param algorithmContext The context in which holds run time states related to the algorithm
* @return Next application member to which the request has to be sent to
*/
- Member getNextApplicationMember(AlgorithmContext algorithmContext);
+// TODO FIX-RUWAN
+// Member getNextApplicationMember(AlgorithmContext algorithmContext);
/**
* Resets the algorithm to its initial position. Initial position depends on the implementation.
@@ -71,4 +73,10 @@
* @param algorithmContext The context in which holds run time states related to the algorithm
*/
void reset(AlgorithmContext algorithmContext);
+
+ /**
+ * Return the name of the load balancing algorithm
+ * @return the name of the algorithm implemented
+ */
+ public String getName();
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java?rev=702579&r1=702578&r2=702579&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java Tue Oct 7 11:38:44 2008
@@ -19,12 +19,13 @@
package org.apache.synapse.endpoints.algorithms;
-import org.apache.axis2.clustering.Member;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseConstants;
import org.apache.synapse.endpoints.Endpoint;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -38,21 +39,9 @@
/**
* Endpoints list for the round robin algorithm
*/
- private List<Endpoint> endpoints = null;
+ private List endpoints = null;
- /**
- * List of application members in the loadb balance group
- */
- private List<Member> members;
-
- public RoundRobin() {
- }
-
- public void setApplicationMembers(List<Member> members) {
- this.members = members;
- }
-
- public void setEndpoints(List<Endpoint> endpoints) {
+ public RoundRobin(List endpoints) {
this.endpoints = endpoints;
}
@@ -60,16 +49,12 @@
* Choose an active endpoint using the round robin algorithm. If there are no active endpoints
* available, returns null.
*
- * @param synapseMessageContext MessageContext instance which holds all per-message properties
- * @param algorithmContext The context in which holds run time states related to the algorithm
+ * @param synCtx MessageContext instance which holds all per-message properties
+ * @param algorithmContext The context in which holds run time states related to the algorithm
* @return endpoint to send the next message
*/
- public Endpoint getNextEndpoint(MessageContext synapseMessageContext,
- AlgorithmContext algorithmContext) {
-
- if (log.isDebugEnabled()) {
- log.debug("Using the Round Robin loadbalancing algorithm to select the next endpoint");
- }
+ public Endpoint getNextEndpoint(MessageContext synCtx,
+ AlgorithmContext algorithmContext) {
Endpoint nextEndpoint;
int attempts = 0;
@@ -77,53 +62,34 @@
do {
// two successive clients could get the same endpoint if not synchronized.
synchronized (this) {
- nextEndpoint = endpoints.get(currentEPR);
+ nextEndpoint = (Endpoint) endpoints.get(currentEPR);
if (currentEPR == endpoints.size() - 1) {
currentEPR = 0;
} else {
currentEPR++;
}
- algorithmContext.setCurrentEndpointIndex(currentEPR);
+ algorithmContext.setCurrentEPR(currentEPR);
}
attempts++;
if (attempts > endpoints.size()) {
- log.warn("Couldn't find an endpoint from the Round Robin loadbalancing algorithm");
return null;
}
- } while (!nextEndpoint.isActive(synapseMessageContext));
+ } while (!nextEndpoint.readyToSend());
return nextEndpoint;
}
- public Member getNextApplicationMember(AlgorithmContext algorithmContext) {
- if (members.size() == 0) {
- return null;
- }
- int currentMemberIndex = algorithmContext.getCurrentEndpointIndex();
- if (currentMemberIndex >= members.size()) {
- currentMemberIndex = 0;
- }
- Member current = members.get(currentMemberIndex);
- if (currentMemberIndex == members.size() - 1) {
- currentMemberIndex = 0;
- } else {
- currentMemberIndex++;
- }
- algorithmContext.setCurrentEndpointIndex(currentMemberIndex);
- if(log.isDebugEnabled()) {
- log.debug("Members : " + members.size());
- log.debug("Current member: " + current);
- }
- return current;
- }
-
public void reset(AlgorithmContext algorithmContext) {
if (log.isDebugEnabled()) {
log.debug("Resetting the Round Robin loadbalancing algorithm ...");
}
- algorithmContext.setCurrentEndpointIndex(0);
+ algorithmContext.setCurrentEPR(0);
+ }
+
+ public String getName() {
+ return "RoundRobin";
}
}
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/AbstractDispatcher.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/AbstractDispatcher.java?rev=702579&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/AbstractDispatcher.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/AbstractDispatcher.java Tue Oct 7 11:38:44 2008
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.endpoints.dispatch;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.endpoints.Endpoint;
+
+import javax.xml.namespace.QName;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public abstract class AbstractDispatcher implements Dispatcher {
+
+ protected Log log;
+ private final static String TRANSPORT_HEADERS = "TRANSPORT_HEADERS";
+
+ protected AbstractDispatcher() {
+ log = LogFactory.getLog(this.getClass());
+ }
+
+ public List<Endpoint> getEndpoints(SessionInformation sessionInformation) {
+ return SALSessions.getInstance().getChildEndpoints(sessionInformation);
+ }
+
+ protected String extractSessionID(OMElement header, QName keyQName) {
+
+ OMElement sgcIDElm = getHeaderBlock(header, keyQName);
+
+ if (sgcIDElm != null) {
+ String sgcID = sgcIDElm.getText();
+
+ if (sgcID != null && !"".equals(sgcID)) {
+ return sgcID.trim();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(keyQName + " is null or empty");
+ }
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Couldn't find the " + keyQName + " SOAP header to find the session");
+ }
+ }
+ return null;
+ }
+
+ protected String extractSessionID(MessageContext synCtx, String key) {
+
+ if (key != null) {
+ Map headerMap = getTransportHeaderMap(synCtx);
+ if (headerMap != null) {
+
+ Object cookie = headerMap.get(key);
+
+ if (cookie instanceof String) {
+ return (String) cookie;
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Couldn't find the " + key + " header to find the session");
+ }
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Couldn't find the TRANSPORT_HEADERS to find the session");
+ }
+
+ }
+ }
+ return null;
+ }
+
+ protected void removeSessionID(MessageContext synCtx, String key) {
+
+ if (key != null) {
+ Map headerMap = getTransportHeaderMap(synCtx);
+ if (headerMap != null) {
+ headerMap.remove(key);
+ }
+ }
+ }
+
+ protected void removeSessionID(OMElement header, QName keyQName) {
+
+ OMElement sgcIDElm = getHeaderBlock(header, keyQName);
+ if (sgcIDElm != null) {
+ sgcIDElm.detach();
+ }
+ }
+
+
+ private Map getTransportHeaderMap(MessageContext synCtx) {
+
+ org.apache.axis2.context.MessageContext axis2MessageContext =
+ ((Axis2MessageContext) synCtx).getAxis2MessageContext();
+
+ Object o = axis2MessageContext.getProperty(TRANSPORT_HEADERS);
+ if (o != null && o instanceof Map) {
+ return (Map) o;
+ }
+ return null;
+ }
+
+ private OMElement getHeaderBlock(OMElement soapHeader, QName keyQName) {
+
+ if (soapHeader != null) {
+ return soapHeader.getFirstChildWithName(keyQName);
+ }
+ return null;
+ }
+}
Propchange: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/AbstractDispatcher.java
------------------------------------------------------------------------------
svn:executable = *
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/Dispatcher.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/Dispatcher.java?rev=702579&r1=702578&r2=702579&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/Dispatcher.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/Dispatcher.java Tue Oct 7 11:38:44 2008
@@ -22,6 +22,8 @@
import org.apache.synapse.MessageContext;
import org.apache.synapse.endpoints.Endpoint;
+import java.util.List;
+
/**
* Defines the behavior of session dispatchers. There can be two dispatcher types. Server initiated
* session dispatchers and client initiated session dispatchers. In the former one, server generates
@@ -37,30 +39,25 @@
* synapseMessageContext is not found it should return null.
*
* @param synCtx client -> esb message context.
- * @param dispatcherContext context for dispatching
* @return Endpoint Endpoint associated with this session.
*/
- public Endpoint getEndpoint(MessageContext synCtx, DispatcherContext dispatcherContext);
+ public SessionInformation getSession(MessageContext synCtx);
/**
* Updates the session maps. This will be called in the first client -> synapse -> server flow
* for client initiated sessions. For server initiated sessions, this will be called in the
* first server -> synapse -> client flow.
*
- * @param synCtx SynapseMessageContext
- * @param dispatcherContext context for dispatching
- * @param endpoint Selected endpoint for this session.
+ * @param synCtx SynapseMessageContext
*/
- public void updateSession(MessageContext synCtx, DispatcherContext dispatcherContext,
- Endpoint endpoint);
+ public void updateSession(MessageContext synCtx);
/**
* Removes the session belonging to the given message context.
*
* @param synCtx MessageContext containing an session ID.
- * @param dispatcherContext context for dispatching
*/
- public void unbind(MessageContext synCtx, DispatcherContext dispatcherContext);
+ public void unbind(MessageContext synCtx);
/**
* Determine whether the session supported by the implementing dispatcher is initiated by the
@@ -69,4 +66,18 @@
* @return true, if the session is initiated by the server. false, otherwise.
*/
public boolean isServerInitiatedSession();
+
+ /**
+ * Returns the endpoint sequence associated with current session with out root
+ *
+ * @param sessionInformation Current Session information
+ * @return Endpoint sequence
+ */
+ public List<Endpoint> getEndpoints(SessionInformation sessionInformation);
+
+ /**
+ * Remove the session Id - To clear out session information from current message
+ * @param syCtx MessageContext containing an session ID
+ */
+ public void removeSessionID(MessageContext syCtx);
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/HttpSessionDispatcher.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/HttpSessionDispatcher.java?rev=702579&r1=702578&r2=702579&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/HttpSessionDispatcher.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/HttpSessionDispatcher.java Tue Oct 7 11:38:44 2008
@@ -19,24 +19,17 @@
package org.apache.synapse.endpoints.dispatch;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
-import org.apache.synapse.core.axis2.Axis2MessageContext;
-import org.apache.synapse.endpoints.Endpoint;
-import java.util.Map;
/**
* Dispatches sessions based on HTTP cookies. Session is initiated by the server in the first
* response when it sends "Set-Cookie" HTTP header with the session ID. For all successive messages
* client should send "Cookie" HTTP header with session ID send by the server.
*/
-public class HttpSessionDispatcher implements Dispatcher {
+public class HttpSessionDispatcher extends AbstractDispatcher {
- private static final Log log = LogFactory.getLog(HttpSessionDispatcher.class);
- private final static String TRANSPORT_HEADERS = "TRANSPORT_HEADERS";
/*HTTP Headers */
private final static String COOKIE = "Cookie";
private final static String SET_COOKIE = "Set-Cookie";
@@ -49,40 +42,8 @@
* @param synCtx MessageContext possibly containing a "Cookie" HTTP header.
* @return Endpoint Server endpoint for the given HTTP session.
*/
- public Endpoint getEndpoint(MessageContext synCtx, DispatcherContext dispatcherContext) {
-
- Endpoint endpoint = null;
-
- org.apache.axis2.context.MessageContext axis2MessageContext =
- ((Axis2MessageContext) synCtx).getAxis2MessageContext();
-
- Object o = axis2MessageContext.getProperty(TRANSPORT_HEADERS);
- if (o != null && o instanceof Map) {
- Map headerMap = (Map) o;
- Object cookie = headerMap.get(COOKIE);
-
- if (cookie != null && cookie instanceof String) {
-
- if (log.isDebugEnabled()) {
- log.debug("Using the HTTP header 'Cookie: " + cookie
- + "' to retrieve the endpoint in the transport session");
- }
-
- Object ep = dispatcherContext.getEndpoint((String) cookie);
- if (ep != null && ep instanceof Endpoint) {
- endpoint = (Endpoint) ep;
- } else if (log.isDebugEnabled()) {
- log.debug("No endpoint found in the transport " +
- "session for the session id " + cookie);
- }
-
- } else if (log.isDebugEnabled()) {
- log.debug("No 'Cookie' HTTP headers found to extract the " +
- "endpoint from the transport session");
- }
- }
-
- return endpoint;
+ public SessionInformation getSession(MessageContext synCtx) {
+ return SALSessions.getInstance().getSession(extractSessionID(synCtx, COOKIE));
}
/**
@@ -90,76 +51,39 @@
* session ID is not already in the session map update the session map by mapping the cookie
* to the endpoint.
*
- * @param synCtx MessageContext possibly containing the "Set-Cookie" HTTP header.
- * @param endpoint Endpoint to be mapped to the session.
+ * @param synCtx MessageContext possibly containing the "Set-Cookie" HTTP header.
*/
- public void updateSession(MessageContext synCtx, DispatcherContext dispatcherContext,
- Endpoint endpoint) {
+ public void updateSession(MessageContext synCtx) {
- if (endpoint == null || dispatcherContext == null) {
- return;
- }
+ Object cookie = extractSessionID(synCtx, SET_COOKIE);
- org.apache.axis2.context.MessageContext axis2MessageContext =
- ((Axis2MessageContext) synCtx).getAxis2MessageContext();
+ if (cookie != null && cookie instanceof String) {
- Object o = axis2MessageContext.getProperty(TRANSPORT_HEADERS);
- if (o != null && o instanceof Map) {
- Map headerMap = (Map) o;
- Object cookie = headerMap.get(SET_COOKIE);
-
- if (cookie != null && cookie instanceof String) {
-
- // extract the first name value pair of the Set-Cookie header, which is considered
- // as the session id which will be sent back from the client with the Cookie header
- // for example;
- // Set-Cookie: JSESSIONID=760764CB72E96A7221506823748CF2AE; Path=/
- // will result in the session id "JSESSIONID=760764CB72E96A7221506823748CF2AE"
- // and the client is expected to send the Cookie header as;
- // Cookie: JSESSIONID=760764CB72E96A7221506823748CF2AE
- if (log.isDebugEnabled()) {
- log.debug("Found the HTTP header 'Set-Cookie: "
- + cookie + "' for updating the session");
- }
- String sessionId = ((String) cookie).split(";")[0];
-
- if (log.isDebugEnabled()) {
- log.debug("Using the session id '" + sessionId +
- "' extracted from the Set-Cookie header to update the session " +
- "with the endpoint " + endpoint);
- }
- dispatcherContext.setEndpoint(sessionId, endpoint);
-
- } else if (log.isDebugEnabled()) {
- log.debug("No 'Set-Cookie' HTTP header is specified in " +
- "the message to update the session");
+ // extract the first name value pair of the Set-Cookie header, which is considered
+ // as the session id which will be sent back from the client with the Cookie header
+ // for example;
+ // Set-Cookie: JSESSIONID=760764CB72E96A7221506823748CF2AE; Path=/
+ // will result in the session id "JSESSIONID=760764CB72E96A7221506823748CF2AE"
+ // and the client is expected to send the Cookie header as;
+ // Cookie: JSESSIONID=760764CB72E96A7221506823748CF2AE
+ if (log.isDebugEnabled()) {
+ log.debug("Found the HTTP header 'Set-Cookie: "
+ + cookie + "' for updating the session");
}
- }
- }
+ String sessionId = ((String) cookie).split(";")[0];
- public void unbind(MessageContext synCtx, DispatcherContext dispatcherContext) {
-
- org.apache.axis2.context.MessageContext axis2MessageContext =
- ((Axis2MessageContext) synCtx).getAxis2MessageContext();
-
- Object o = axis2MessageContext.getProperty(TRANSPORT_HEADERS);
- if (o != null && o instanceof Map) {
- Map headerMap = (Map) o;
- Object cookie = headerMap.get(COOKIE);
-
- if (cookie != null && cookie instanceof String) {
-
- if (log.isDebugEnabled()) {
- log.debug("Using the HTTP header 'Cookie: "
- + cookie + "' to unbind the session");
- }
- dispatcherContext.removeSession((String) cookie);
-
- } else if (log.isDebugEnabled()) {
- log.debug("No 'Cookie' HTTP header is specified in " +
- "the message to unbind the session");
+ if (log.isDebugEnabled()) {
+ log.debug("Using the session id '" + sessionId +
+ "' extracted from the Set-Cookie header ");
}
+
+ SALSessions.getInstance().updateSession(synCtx, sessionId);
}
+
+ }
+
+ public void unbind(MessageContext synCtx) {
+ SALSessions.getInstance().removeSession(extractSessionID(synCtx, COOKIE));
}
/**
@@ -170,4 +94,8 @@
public boolean isServerInitiatedSession() {
return true;
}
+
+ public void removeSessionID(MessageContext syCtx) {
+ removeSessionID(syCtx, COOKIE);
+ }
}