You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by in...@apache.org on 2008/05/09 20:38:05 UTC
svn commit: r654906 - in
/synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse:
endpoints/ mediators/builtin/
Author: indika
Date: Fri May 9 11:38:05 2008
New Revision: 654906
URL: http://svn.apache.org/viewvc?rev=654906&view=rev
Log:
Fix an issue in session affinity loadbalance endpoint in the case of server starting session
Modified:
synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/AddressEndpoint.java
synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/FailoverEndpoint.java
synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java
synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java
synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java
synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java
Modified: synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/AddressEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/AddressEndpoint.java?rev=654906&r1=654905&r2=654906&view=diff
==============================================================================
--- synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/AddressEndpoint.java (original)
+++ synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/AddressEndpoint.java Fri May 9 11:38:05 2008
@@ -61,10 +61,15 @@
if (System.currentTimeMillis() > recoverOn) {
active = true;
endpointContext.setActive(true);
- endpointContext.setRecoverOn(0);
+ endpointContext.setRecoverOn(0);
}
}
+
+ if (log.isDebugEnabled()) {
+ log.debug("Endpoint '" + getName() + "' is in state ' " + active + " '");
+ }
+
return active;
}
Modified: synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/FailoverEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/FailoverEndpoint.java?rev=654906&r1=654905&r2=654906&view=diff
==============================================================================
--- synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/FailoverEndpoint.java (original)
+++ synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/FailoverEndpoint.java Fri May 9 11:38:05 2008
@@ -175,6 +175,10 @@
}
}
}
+
+ if (log.isDebugEnabled()) {
+ log.debug("Endpoint '" + name + "' is in state ' " + active + " '");
+ }
return active;
}
Modified: synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java?rev=654906&r1=654905&r2=654906&view=diff
==============================================================================
--- synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java (original)
+++ synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java Fri May 9 11:38:05 2008
@@ -198,6 +198,10 @@
}
}
+ if (log.isDebugEnabled()) {
+ log.debug("Endpoint '" + name + "' is in state ' " + active + " '");
+ }
+
return active;
}
Modified: synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java?rev=654906&r1=654905&r2=654906&view=diff
==============================================================================
--- synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java (original)
+++ synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java Fri May 9 11:38:05 2008
@@ -62,6 +62,7 @@
private static final String FIRST_MESSAGE_IN_SESSION = "first_message_in_session";
public static final String ENDPOINT_LIST = "endpointList";
+ public static final String ROOT_ENDPOINT = "rootendpoint";
public static final String ENDPOINT_NAME_LIST = "endpointNameList";
private static final String WARN_MESSAGE = "In a clustering environment , the endpoint " +
" name should be specified" +
@@ -187,73 +188,53 @@
// replicate endpoint itself
Object o = opCtx.getPropertyNonReplicable(ENDPOINT_NAME_LIST);
- if (o != null) {
+ List epNameList;
+ if (o instanceof List) {
- List endpointList = (List) o;
- endpointList.add(endPointName);
-
- // if the next endpoint is not a session affinity one, endpoint sequence ends
- // here. but we have to add the next endpoint to the list.
- if (!(endpoint instanceof SALoadbalanceEndpoint)) {
- String name = endpoint.getName();
- if (name == null) {
- log.warn(WARN_MESSAGE);
- name = SynapseConstants.ANONYMOUS_ENDPOINT;
- }
- endpointList.add(name);
- }
+ epNameList = (List) o;
+ epNameList.add(endPointName);
} else {
// this is the first endpoint in the heirachy. so create the queue and insert
// this as the first element.
- List endpointList = new ArrayList();
- endpointList.add(endPointName);
-
- // if the next endpoint is not a session affinity one, endpoint sequence ends
- // here. but we have to add the next endpoint to the list.
- if (!(endpoint instanceof SALoadbalanceEndpoint)) {
- String name = endpoint.getName();
- if (name == null) {
- log.warn(WARN_MESSAGE);
- name = SynapseConstants.ANONYMOUS_ENDPOINT;
- }
- endpointList.add(name);
- }
-
- opCtx.setProperty(ENDPOINT_NAME_LIST, endpointList);
+ epNameList = new ArrayList();
+ epNameList.add(endPointName);
+ opCtx.setNonReplicableProperty(ROOT_ENDPOINT,this);
}
-
- }
-
- Object o = opCtx.getProperty(ENDPOINT_LIST);
-
- if (o != null) {
- List<Endpoint> endpointList = (List<Endpoint>) o;
- endpointList.add(this);
-
// if the next endpoint is not a session affinity one, endpoint sequence ends
// here. but we have to add the next endpoint to the list.
if (!(endpoint instanceof SALoadbalanceEndpoint)) {
- endpointList.add(endpoint);
+ String name = endpoint.getName();
+ if (name == null) {
+ log.warn(WARN_MESSAGE);
+ name = SynapseConstants.ANONYMOUS_ENDPOINT;
+ }
+ epNameList.add(name);
}
- } else {
+ opCtx.setProperty(ENDPOINT_NAME_LIST, epNameList);
- // this is the first endpoint in the heirachy. so create the queue and insert
- // this as the first element.
- List endpointList = new ArrayList();
- endpointList.add(this);
+ } else {
+ Object o = opCtx.getProperty(ENDPOINT_LIST);
+ List<Endpoint> endpointList;
+ if (o instanceof List) {
+ endpointList = (List<Endpoint>) o;
+ endpointList.add(this);
+ } else {
+ // this is the first endpoint in the heirachy. so create the queue and insert
+ // this as the first element.
+ endpointList = new ArrayList();
+ endpointList.add(this);
+ opCtx.setProperty(ENDPOINT_LIST, endpointList);
+ }
// if the next endpoint is not a session affinity one, endpoint sequence ends
// here. but we have to add the next endpoint to the list.
if (!(endpoint instanceof SALoadbalanceEndpoint)) {
endpointList.add(endpoint);
}
-
- opCtx.setProperty(ENDPOINT_LIST, endpointList);
}
-
} else {
dispatcher.updateSession(synMessageContext, dispatcherContext, endpoint);
}
@@ -294,7 +275,7 @@
* @param isClusteringEnable
*/
public void updateSession(MessageContext responseMsgCtx, List endpointList,
- boolean isClusteringEnable) {
+ boolean isClusteringEnable) {
Endpoint endpoint = null;
@@ -303,12 +284,12 @@
// Only keeps endpoint names , because , it is heavy task to
// replicate endpoint itself
String epNameObj = (String) endpointList.remove(0);
- for (Iterator it = endpointList.iterator(); it.hasNext();) {
- Object epObj = it.next();
- if (epObj != null && epObj instanceof Endpoint) {
- String name = ((Endpoint) epObj).getName();
+ for (Endpoint ep :endpoints) {
+ if (ep != null) {
+ String name = ep.getName();
if (name != null && name.equals(epNameObj)) {
- endpoint = ((Endpoint) epObj);
+ endpoint = ep;
+ break;
}
}
}
@@ -356,8 +337,27 @@
*/
public boolean isActive(MessageContext synMessageContext) {
// todo: implement above
-
- return endpointContext.isActive();
+ boolean active;
+ Endpoint endpoint = dispatcher.getEndpoint(synMessageContext, dispatcherContext);
+ if (endpoint == null) { // If a session is not started
+ active = endpointContext.isActive();
+ if (!active && endpoints != null) {
+ for (Endpoint ep : endpoints) {
+ if (ep.isActive(synMessageContext)) { //AND at least one child endpoint is active
+ active = true;
+ endpointContext.setActive(true);
+ // don't break the loop though we found one active endpoint. calling isActive()
+ // on all child endpoints will update their active state. so this is a good
+ // time to do that.
+ }
+ }
+ }
+ } else {
+ //If a session is started AND the binding endpoint is active.
+ active = endpoint.isActive(synMessageContext);
+ endpointContext.setActive(active);
+ }
+ return active;
}
public void setActive(boolean active, MessageContext synMessageContext) {
Modified: synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java?rev=654906&r1=654905&r2=654906&view=diff
==============================================================================
--- synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java (original)
+++ synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java Fri May 9 11:38:05 2008
@@ -240,6 +240,11 @@
endpointContext.setActive(true);
}
}
+
+ if (log.isDebugEnabled()) {
+ log.debug("Endpoint '" + name + "' is in state ' " + active + " '");
+ }
+
return active;
}
Modified: synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java?rev=654906&r1=654905&r2=654906&view=diff
==============================================================================
--- synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java (original)
+++ synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java Fri May 9 11:38:05 2008
@@ -64,97 +64,93 @@
}
}
- // if no endpoints are defined, send where implicitly stated
- if (endpoint == null) {
+ if (synCtx.isResponse()) {
- if (traceOrDebugOn) {
- StringBuffer sb = new StringBuffer();
- sb.append("Sending " + (synCtx.isResponse() ? "response" : "request")
- + " message using implicit message properties..");
- sb.append("\nSending To: " + (synCtx.getTo() != null ?
- synCtx.getTo().getAddress() : "null"));
- sb.append("\nSOAPAction: " + (synCtx.getWSAAction() != null ?
- synCtx.getWSAAction() : "null"));
- traceOrDebug(traceOn, sb.toString());
- }
+ Axis2MessageContext axis2MsgCtx = (Axis2MessageContext) synCtx;
+ OperationContext opCtx = axis2MsgCtx.getAxis2MessageContext().getOperationContext();
- if (traceOn && trace.isTraceEnabled()) {
- trace.trace("Envelope : " + synCtx.getEnvelope());
- }
+ boolean isClusteringEnable = false;
- if (synCtx.isResponse()) {
+ // get Axis2 MessageContext and ConfigurationContext
+ org.apache.axis2.context.MessageContext axisMC =
+ axis2MsgCtx.getAxis2MessageContext();
+ ConfigurationContext cc = axisMC.getConfigurationContext();
- Axis2MessageContext axis2MsgCtx = (Axis2MessageContext) synCtx;
- OperationContext opCtx = axis2MsgCtx.getAxis2MessageContext().getOperationContext();
- Object o = opCtx.getProperty(SALoadbalanceEndpoint.ENDPOINT_LIST);
+ //The heck for clustering environment
- if (o != null && o instanceof List) {
+ ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager();
+ if (clusterManager != null &&
+ clusterManager.getContextManager() != null) {
+ isClusteringEnable = true;
+ }
- boolean isClusteringEnable = false;
+ if (isClusteringEnable) {
+ // if this is a clustering env.
+ // Only keeps endpoint names , because , it is heavy task to
+ // replicate endpoint itself
+ Object epNames = opCtx.getPropertyNonReplicable(SALoadbalanceEndpoint.ENDPOINT_NAME_LIST);
+ if (epNames != null && epNames instanceof List) {
+
+ List epNameList = (List) epNames;
+ Object obj = epNameList.remove(0);
+ if (obj != null && obj instanceof String) {
+ Object rootEPObj = opCtx.getPropertyNonReplicable(
+ SALoadbalanceEndpoint.ROOT_ENDPOINT);
+
+ if (rootEPObj != null && rootEPObj instanceof Endpoint) {
+ String name = ((Endpoint) rootEPObj).getName();
+
+ if (name != null && name.equals(obj)) {
+ Endpoint rootEP = ((Endpoint) rootEPObj);
+
+ if (rootEP instanceof SALoadbalanceEndpoint) {
+ SALoadbalanceEndpoint salEP = (SALoadbalanceEndpoint) rootEP;
+ salEP.updateSession(synCtx, epNameList,
+ isClusteringEnable);
+ }
+ }
+ }
- // get Axis2 MessageContext and ConfigurationContext
- org.apache.axis2.context.MessageContext axisMC =
- axis2MsgCtx.getAxis2MessageContext();
- ConfigurationContext cc = axisMC.getConfigurationContext();
-
- //The heck for clustering environment
-
- ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager();
- if (clusterManager != null &&
- clusterManager.getContextManager() != null) {
- isClusteringEnable = true;
}
+ opCtx.setProperty(SALoadbalanceEndpoint.ENDPOINT_NAME_LIST, epNames);
+ }
+
+ } else {
+ Object o = opCtx.getProperty(SALoadbalanceEndpoint.ENDPOINT_LIST);
+ if (o != null && o instanceof List) {
// we are in the response of the first message of a server initiated session
// so update all session maps
List epList = (List) o;
+ Object e = epList.remove(0);
-
- if (isClusteringEnable) {
- // if this is a clustering env.
- // Only keeps endpoint names , because , it is heavy task to
- // replicate endpoint itself
- Object epNames = opCtx.getProperty(SALoadbalanceEndpoint.ENDPOINT_NAME_LIST);
- if (epNames != null && epNames instanceof List) {
-
- List epNameList = (List) epNames;
- Object obj = epNameList.remove(0);
- if (obj != null && obj instanceof String) {
-
- for (Iterator it = epList.iterator(); it.hasNext();) {
- Object epObj = it.next();
-
- if (epObj != null && epObj instanceof Endpoint) {
- String name = ((Endpoint) epObj).getName();
-
- if (name != null && name.equals(obj)) {
- Endpoint ep = ((Endpoint) epObj);
-
- if (ep instanceof SALoadbalanceEndpoint) {
- SALoadbalanceEndpoint salEP
- = (SALoadbalanceEndpoint) ep;
- salEP.updateSession(synCtx, epNameList,
- isClusteringEnable);
- }
- }
- }
- }
-
- }
- }
-
- } else {
- Object e = epList.remove(0);
-
- if (e != null) {
- if (e instanceof SALoadbalanceEndpoint) {
- SALoadbalanceEndpoint salEP = (SALoadbalanceEndpoint) e;
- salEP.updateSession(synCtx, epList, isClusteringEnable);
- }
+ if (e != null) {
+ if (e instanceof SALoadbalanceEndpoint) {
+ SALoadbalanceEndpoint salEP = (SALoadbalanceEndpoint) e;
+ salEP.updateSession(synCtx, epList, isClusteringEnable);
}
}
}
}
+ }
+
+ // if no endpoints are defined, send where implicitly stated
+ if (endpoint == null) {
+
+ if (traceOrDebugOn) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("Sending " + (synCtx.isResponse() ? "response" : "request")
+ + " message using implicit message properties..");
+ sb.append("\nSending To: " + (synCtx.getTo() != null ?
+ synCtx.getTo().getAddress() : "null"));
+ sb.append("\nSOAPAction: " + (synCtx.getWSAAction() != null ?
+ synCtx.getWSAAction() : "null"));
+ traceOrDebug(traceOn, sb.toString());
+ }
+
+ if (traceOn && trace.isTraceEnabled()) {
+ trace.trace("Envelope : " + synCtx.getEnvelope());
+ }
synCtx.getEnvironment().send(null, synCtx);
} else {