You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2007/06/08 13:51:08 UTC
svn commit: r545487 - in /webservices/axis2/trunk/java/modules:
clustering/src/org/apache/axis2/clustering/context/
clustering/src/org/apache/axis2/clustering/handlers/
clustering/src/org/apache/axis2/clustering/tribes/
kernel/src/org/apache/axis2/clus...
Author: azeez
Date: Fri Jun 8 04:51:07 2007
New Revision: 545487
URL: http://svn.apache.org/viewvc?view=rev&rev=545487
Log:
Skip the replication logic when there is only a single member in the cluster.
Do not keep track of property differences when context replication is disabled or when there are no members in the cluster.
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java?view=diff&rev=545487&r1=545486&r2=545487
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java Fri Jun 8 04:51:07 2007
@@ -114,38 +114,44 @@
Map excludedPropertyPatterns,
boolean includeAllProperties) {
if (!includeAllProperties) {
- Map diffs = context.getPropertyDifferences();
- synchronized (context) {
- for (Iterator iter = diffs.keySet().iterator(); iter.hasNext();) {
- String key = (String) iter.next();
- Object prop = context.getPropertyNonReplicable(key);
- if (prop instanceof Serializable) { // First check whether it is serializable
- // Next check whether it matches an excluded pattern
- if (!isExcluded(key, context.getClass().getName(), excludedPropertyPatterns))
- {
- log.debug("sending property =" + key + "-" + prop);
- PropertyDifference diff = (PropertyDifference) diffs.get(key);
- diff.setValue(prop);
- updateCmd.addProperty(diff);
+ // Sometimes, there can be failures, so if an exception occurs, we retry
+ while (true) {
+ Map diffs = context.getPropertyDifferences();
+ try {
+ for (Iterator iter = diffs.keySet().iterator(); iter.hasNext();) {
+ String key = (String) iter.next();
+ Object prop = context.getPropertyNonReplicable(key);
+
+ // First check whether it is serializable
+ if (prop instanceof Serializable) {
+
+ // Next check whether it matches an excluded pattern
+ if (!isExcluded(key,
+ context.getClass().getName(),
+ excludedPropertyPatterns)) {
+ log.debug("sending property =" + key + "-" + prop);
+ PropertyDifference diff = (PropertyDifference) diffs.get(key);
+ diff.setValue(prop);
+ updateCmd.addProperty(diff);
+ }
}
}
+ break;
+ } catch (Exception ignored) {
}
}
} else {
- synchronized (context) {
- for (Iterator iter = context.getPropertyNames(); iter.hasNext();) {
- String key = (String) iter.next();
- Object prop = context.getPropertyNonReplicable(key);
- if (prop instanceof Serializable) { // First check whether it is serializable
+ for (Iterator iter = context.getPropertyNames(); iter.hasNext();) {
+ String key = (String) iter.next();
+ Object prop = context.getPropertyNonReplicable(key);
+ if (prop instanceof Serializable) { // First check whether it is serializable
- // Next check whether it matches an excluded pattern
- if (!isExcluded(key, context.getClass().getName(), excludedPropertyPatterns))
- {
- log.debug("sending property =" + key + "-" + prop);
- PropertyDifference diff = new PropertyDifference(key, prop, false);
- updateCmd.addProperty(diff);
- }
+ // Next check whether it matches an excluded pattern
+ if (!isExcluded(key, context.getClass().getName(), excludedPropertyPatterns)) {
+ log.debug("sending property =" + key + "-" + prop);
+ PropertyDifference diff = new PropertyDifference(key, prop, false);
+ updateCmd.addProperty(diff);
}
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java?view=diff&rev=545487&r1=545486&r2=545487
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java Fri Jun 8 04:51:07 2007
@@ -49,6 +49,14 @@
}
public void flowComplete(MessageContext msgContext) {
+
+ // If there are no members, we need not do any replication
+ ClusterManager clusterManager =
+ msgContext.getConfigurationContext().getAxisConfiguration().getClusterManager();
+ if(clusterManager != null && clusterManager.getMemberCount() == 0){
+ return;
+ }
+
int flow = msgContext.getFLOW();
String mep = msgContext.getAxisOperation().getMessageExchangePattern();
@@ -59,7 +67,7 @@
mep.equals(WSDL2Constants.MEP_URI_IN_OPTIONAL_OUT) ||
mep.equals(WSDL2Constants.MEP_URI_ROBUST_IN_ONLY))
&& (flow == MessageContext.IN_FLOW || flow == MessageContext.IN_FAULT_FLOW));
-
+
boolean replicateOnOutFlow =
(mep.equals(WSDL2Constants.MEP_URI_IN_OUT) ||
mep.equals(WSDL2Constants.MEP_URI_OUT_ONLY) ||
@@ -69,7 +77,7 @@
&& (flow == MessageContext.OUT_FLOW || flow == MessageContext.OUT_FAULT_FLOW);
if (replicateOnInFLow || replicateOnOutFlow) {
- System.err.println("### [FLOW COMPLETE] Going to replicate state. Flow:" + flow);
+ log.debug("### [FLOW COMPLETE] Going to replicate state. Flow:" + flow);
try {
replicateState(msgContext);
} catch (Exception e) {
@@ -122,7 +130,7 @@
// Wait till all members have ACKed receipt & successful processing of
// the message with UUID 'msgUUID'
- /*do {
+ do {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
@@ -133,7 +141,7 @@
throw new ClusteringFault("ACKs not received from all members within 20 sec. " +
"Aborting wait.");
}
- } while (!contextManager.isMessageAcknowledged(msgUUID));*/
+ } while (!contextManager.isMessageAcknowledged(msgUUID));
}
} else {
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java?view=diff&rev=545487&r1=545486&r2=545487
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java Fri Jun 8 04:51:07 2007
@@ -18,6 +18,8 @@
import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.context.ContextClusteringCommand;
import org.apache.catalina.tribes.Member;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.util.Hashtable;
import java.util.List;
@@ -25,10 +27,10 @@
import java.util.Vector;
/**
- *
+ *
*/
public final class AckManager {
-
+ private static Log log = LogFactory.getLog(AckManager.class);
private static Map messageAckTable = new Hashtable();
public static void addInitialAcknowledgement(ContextClusteringCommand command) {
@@ -40,7 +42,9 @@
MessageACK ack = (MessageACK) messageAckTable.get(messageUniqueId);
if (ack != null) {
List memberList = ack.getMemberList();
- memberList.add(memberId);
+ if (!memberList.contains(memberId)) { // If the member has not already ACKed
+ memberList.add(memberId);
+ }
}
}
@@ -59,15 +63,15 @@
for (int i = 0; i < members.length; i++) {
Member member = members[i];
if (!memberList.contains(member.getName())) {
- System.err.println("\n\n");
- System.err.println("##### NO ACK from member " + member.getName());
- System.err.println("#### ACKed member list=" + memberList);
- System.err.println("\n\n");
+ log.debug("[NO ACK] from member " + member.getName());
+ log.debug("ACKed member list=" + memberList);
// At this point, resend the original message back to the node which has not
// sent an ACK
- sender.sendToMember(ack.getCommand(), member);
+ if (member.isReady()) {
+ sender.sendToMember(ack.getCommand(), member);
+ }
- //TODO: Check whether this is a new member. If then send the msg
+ //TODO: Enhancement, Check whether this is a new member. If then send the msg
isAcknowledged = false;
break;
} else {
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?view=diff&rev=545487&r1=545486&r2=545487
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Fri Jun 8 04:51:07 2007
@@ -31,21 +31,32 @@
private Channel channel;
public void sendToGroup(ClusteringCommand msg) throws ClusteringFault {
- if(channel == null) return;
- Member[] members = channel.getMembers();
- if (members.length > 0) {
- try {
- channel.send(members, msg, Channel.DEFAULT);
- log.debug("Sent " + msg + " to group");
- } catch (ChannelException e) {
- String message = "Error sending command message : " + msg;
- throw new ClusteringFault(message, e);
+ if (channel == null) {
+ return;
+ }
+
+ // Keep retrying, since at the point of trying to send the msg, a member may leave the group
+ while (true) {
+ if (channel.getMembers().length > 0) {
+ try {
+ channel.send(channel.getMembers(), msg, Channel.DEFAULT);
+ log.debug("Sent " + msg + " to group");
+ break;
+ } catch (ChannelException e) {
+ String message = "Error sending command message : " + msg +
+ ". Reason " + e.getMessage();
+ log.warn(message);
+ }
+ } else {
+ break;
}
}
}
public void sendToSelf(ClusteringCommand msg) throws ClusteringFault {
- if(channel == null) return;
+ if (channel == null) {
+ return;
+ }
try {
channel.send(new Member[]{channel.getLocalMember(true)},
msg,
@@ -57,15 +68,23 @@
}
public void sendToGroup(Throwable throwable) throws ClusteringFault {
- if(channel == null) return;
- Member[] group = channel.getMembers();
- if (group.length > 0) {
- try {
- channel.send(group, throwable, 0);
- log.debug("Sent " + throwable + " to group");
- } catch (ChannelException e) {
- String message = "Error sending exception message : " + throwable;
- throw new ClusteringFault(message, e);
+ if (channel == null) {
+ return;
+ }
+
+ // Keep retrying, since at the point of trying to send the msg, a member may leave the group
+ while (true) {
+ if (channel.getMembers().length > 0) {
+ try {
+ channel.send(channel.getMembers(), throwable, 0);
+ log.debug("Sent " + throwable + " to group");
+ } catch (ChannelException e) {
+ String message = "Error sending exception message : " + throwable +
+ ". Reason " + e.getMessage();
+ log.warn(message);
+ }
+ } else {
+ break;
}
}
}
@@ -75,7 +94,9 @@
channel.send(new Member[]{member}, cmd, Channel.DEFAULT);
log.debug("Sent " + cmd + " to " + member.getName());
} catch (ChannelException e) {
- throw new ClusteringFault(e);
+ String message = "Could not send message to " + member.getName() +
+ ". Reason " + e.getMessage();
+ log.warn(message);
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?view=diff&rev=545487&r1=545486&r2=545487
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Fri Jun 8 04:51:07 2007
@@ -239,4 +239,8 @@
channelListener.setConfigurationContext(configurationContext);
}
}
+
+ public int getMemberCount() {
+ return channel.getMembers().length;
+ }
}
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java?view=diff&rev=545487&r1=545486&r2=545487
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java Fri Jun 8 04:51:07 2007
@@ -66,4 +66,11 @@
* @param configurationContext
*/
void setConfigurationContext(ConfigurationContext configurationContext);
+
+ /**
+ * Get the total number of members in the cluster
+ *
+ * @return The total number of members in the cluster
+ */
+ int getMemberCount();
}
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java?view=diff&rev=545487&r1=545486&r2=545487
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java Fri Jun 8 04:51:07 2007
@@ -77,7 +77,7 @@
* @return Iterator over a collection of keys
*/
public Iterator getPropertyNames() {
- if(properties == null){
+ if (properties == null) {
properties = new HashMap();
}
return properties.keySet().iterator();
@@ -97,7 +97,7 @@
// Assume that a property is which is read may be updated.
// i.e. The object pointed to by 'value' may be modified after it is read
- propertyDifferences.put(key, new PropertyDifference(key, false));
+ addPropertyDifference(key);
}
return obj;
}
@@ -128,7 +128,18 @@
this.properties = new HashMap();
}
properties.put(key, value);
- propertyDifferences.put(key, new PropertyDifference(key, false));
+ addPropertyDifference(key);
+ }
+
+ private void addPropertyDifference(String key) {
+ // Add the property differences only if Context replication is enabled,
+ // and there are members in the cluster
+ ClusterManager clusterManager = getRootContext().getAxisConfiguration().getClusterManager();
+ if (clusterManager != null &&
+ clusterManager.getContextManager() != null &&
+ clusterManager.getMemberCount() != 0) {
+ propertyDifferences.put(key, new PropertyDifference(key, false));
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org