You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2007/06/20 14:53:57 UTC
svn commit: r549095 - in /webservices/axis2/trunk/java/modules:
clustering/src/org/apache/axis2/clustering/context/
clustering/src/org/apache/axis2/clustering/tribes/
kernel/src/org/apache/axis2/clustering/
kernel/src/org/apache/axis2/clustering/contex...
Author: azeez
Date: Wed Jun 20 05:53:56 2007
New Revision: 549095
URL: http://svn.apache.org/viewvc?view=rev&rev=549095
Log:
Fixing some synchronization issues with clustering.
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java?view=diff&rev=549095&r1=549094&r2=549095
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java Wed Jun 20 05:53:56 2007
@@ -107,8 +107,9 @@
}
}
- //TODO: This is causing a concurrency issue. Need to look into this
- context.clearPropertyDifferences(); // Once we send the diffs, we should clear the diffs
+ synchronized (context) {
+ context.clearPropertyDifferences(); // Once we send the diffs, we should clear the diffs
+ }
return cmd;
}
@@ -124,45 +125,40 @@
Map excludedPropertyPatterns,
boolean includeAllProperties) {
if (!includeAllProperties) {
+ synchronized (context) {
+ Map diffs = context.getPropertyDifferences();
+ for (Iterator iter = diffs.keySet().iterator(); iter.hasNext();) {
+ String key = (String) iter.next();
+ Object prop = context.getPropertyNonReplicable(key);
- // Sometimes, there can be failures, so if an exception occurs, we retry
- // TODO: Need to investigate these failures. Looks like this is becuase the contexts are not synchronized
-// while (true) {
- Map diffs = context.getPropertyDifferences();
-// try {
- for (Iterator iter = diffs.keySet().iterator(); iter.hasNext();) {
- String key = (String) iter.next();
- Object prop = context.getPropertyNonReplicable(key);
+ // First check whether it is serializable
+ if (prop instanceof Serializable) {
- // First check whether it is serializable
- if (prop instanceof Serializable) {
-
- // Next check whether it matches an excluded pattern
- if (!isExcluded(key,
- context.getClass().getName(),
- excludedPropertyPatterns)) {
- log.debug("sending property =" + key + "-" + prop);
- PropertyDifference diff = (PropertyDifference) diffs.get(key);
- diff.setValue(prop);
- updateCmd.addProperty(diff);
+ // Next check whether it matches an excluded pattern
+ if (!isExcluded(key,
+ context.getClass().getName(),
+ excludedPropertyPatterns)) {
+ log.debug("sending property =" + key + "-" + prop);
+ PropertyDifference diff = (PropertyDifference) diffs.get(key);
+ diff.setValue(prop);
+ updateCmd.addProperty(diff);
+ }
}
}
}
-// break;
-// } catch (Exception ignored) {
-// }
-// }
} else {
- for (Iterator iter = context.getPropertyNames(); iter.hasNext();) {
- String key = (String) iter.next();
- Object prop = context.getPropertyNonReplicable(key);
- if (prop instanceof Serializable) { // First check whether it is serializable
+ synchronized (context) {
+ for (Iterator iter = context.getPropertyNames(); iter.hasNext();) {
+ String key = (String) iter.next();
+ Object prop = context.getPropertyNonReplicable(key);
+ if (prop instanceof Serializable) { // First check whether it is serializable
- // Next check whether it matches an excluded pattern
- if (!isExcluded(key, context.getClass().getName(), excludedPropertyPatterns)) {
- log.debug("sending property =" + key + "-" + prop);
- PropertyDifference diff = new PropertyDifference(key, prop, false);
- updateCmd.addProperty(diff);
+ // Next check whether it matches an excluded pattern
+ if (!isExcluded(key, context.getClass().getName(), excludedPropertyPatterns)) {
+ log.debug("sending property =" + key + "-" + prop);
+ PropertyDifference diff = new PropertyDifference(key, prop, false);
+ updateCmd.addProperty(diff);
+ }
}
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?view=diff&rev=549095&r1=549094&r2=549095
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Wed Jun 20 05:53:56 2007
@@ -31,7 +31,6 @@
import org.apache.catalina.tribes.Member;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import sun.misc.Queue;
import java.io.Serializable;
@@ -43,17 +42,6 @@
private TribesControlCommandProcessor controlCommandProcessor;
private ChannelSender channelSender;
- /**
- * The messages received are enqued. Another thread, messageProcessor, will
- * process these messages in the order that they were received.
- */
- private final Queue cmdQueue = new Queue();
-
- /**
- * The thread which picks up messages from the cmdQueue and processes them.
- */
- private Thread messageProcessor;
-
private ConfigurationContext configurationContext;
public ChannelListener(ConfigurationContext configurationContext,
@@ -66,7 +54,6 @@
this.controlCommandProcessor = controlCommandProcessor;
this.channelSender = sender;
this.configurationContext = configurationContext;
-// startMessageProcessor();
}
public void setContextManager(DefaultContextManager contextManager) {
@@ -95,41 +82,13 @@
return;
}
log.debug("RECEIVED MESSAGE " + msg + " from " + TribesUtil.getHost(sender));
-
- // Need to process ACKs as soon as they are received since otherwise,
- // unnecessary retransmissions will take place
- /*if (msg instanceof AckCommand) {
- try {
- controlCommandProcessor.process((AckCommand) msg, sender);
- } catch (Exception e) {
- log.error(e);
- }
- return;
- }*/
-
try {
processMessage(msg,sender);
} catch (Exception e) {
log.error(e);
}
-
- // Add the commands to be precessed to the cmdQueue
- /*synchronized (cmdQueue) {
- cmdQueue.enqueue(new MemberMessage(msg, sender));
- }
- if (!messageProcessor.isAlive()) {
- startMessageProcessor();
- }*/
- }
-
- private void startMessageProcessor() {
- messageProcessor = new Thread(new MessageProcessor(), "ClusteringInComingMessageProcessor");
- messageProcessor.setDaemon(true);
- messageProcessor.setPriority(Thread.MAX_PRIORITY);
- messageProcessor.start();
}
-
private void processMessage(Serializable msg, Member sender) throws ClusteringFault {
if (msg instanceof ContextClusteringCommand && contextManager != null) {
ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg;
@@ -150,49 +109,6 @@
} else if (msg instanceof ControlCommand && controlCommandProcessor != null) {
controlCommandProcessor.process((ControlCommand) msg,
sender);
- }
- }
-
- /**
- * A container to hold a message and its sender
- */
- private class MemberMessage {
- private Serializable message;
- private Member sender;
-
- public MemberMessage(Serializable msg, Member sender) {
- this.message = msg;
- this.sender = sender;
- }
-
- public Serializable getMessage() {
- return message;
- }
-
- public Member getSender() {
- return sender;
- }
- }
-
- /**
- * A processor which continuously polls for messages in the cmdQueue and processes them
- */
- private class MessageProcessor implements Runnable {
- public void run() {
- while (true) {
- MemberMessage memberMessage = null;
- try {
- if (!cmdQueue.isEmpty()) {
- memberMessage = (MemberMessage) cmdQueue.dequeue();
- } else {
- Thread.sleep(1);
- continue;
- }
- processMessage(memberMessage.getMessage(), memberMessage.getSender());
- } catch (Throwable e) {
- log.error("Could not process message ", e);
- }
- }
}
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?view=diff&rev=549095&r1=549094&r2=549095
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Wed Jun 20 05:53:56 2007
@@ -36,6 +36,7 @@
// Keep retrying, since at the point of trying to send the msg, a member may leave the group
// causing a view change. All nodes in a view should get the msg
+ //TODO: Sometimes Tribes ncorrectly detects that a member has left a group
while (true) {
if (channel.getMembers().length > 0) {
try {
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?view=diff&rev=549095&r1=549094&r2=549095
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Wed Jun 20 05:53:56 2007
@@ -116,9 +116,10 @@
mcastProps.setProperty("tcpListenPort", "4000");
mcastProps.setProperty("tcpListenHost", "127.0.0.1");*/
-// TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
-// tcpFailureDetector.setPrevious(nbc);
-// channel.addInterceptor(tcpFailureDetector);
+ /*TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
+ tcpFailureDetector.setPrevious(nbc);
+ channel.addInterceptor(tcpFailureDetector);
+ tcpFailureDetector.*/
channel.addChannelListener(channelListener);
TribesMembershipListener membershipListener = new TribesMembershipListener();
@@ -241,12 +242,5 @@
if (channelListener != null) {
channelListener.setConfigurationContext(configurationContext);
}
- }
-
- public int getMemberCount() {
- if (channel != null) {
- return channel.getMembers().length;
- }
- return 0;
}
}
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java?view=diff&rev=549095&r1=549094&r2=549095
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java Wed Jun 20 05:53:56 2007
@@ -67,10 +67,4 @@
*/
void setConfigurationContext(ConfigurationContext configurationContext);
- /**
- * Get the total number of members in the cluster
- *
- * @return The total number of members in the cluster
- */
- int getMemberCount();
}
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java?view=diff&rev=549095&r1=549094&r2=549095
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java Wed Jun 20 05:53:56 2007
@@ -44,6 +44,7 @@
replicateState(msgContext);
} catch (Exception e) {
String message = "Could not replicate the state";
+ log.error(message, e);
throw new ClusteringFault(message, e);
}
}
@@ -57,6 +58,7 @@
replicateState(abstractContext);
} catch (Exception e) {
String message = "Could not replicate the state";
+ log.error(message, e);
throw new ClusteringFault(message, e);
}
}
@@ -73,8 +75,7 @@
ClusterManager clusterManager =
abstractContext.getRootContext().getAxisConfiguration().getClusterManager();
return clusterManager != null &&
- clusterManager.getContextManager() != null &&
- clusterManager.getMemberCount() != 0;
+ clusterManager.getContextManager() != null;
}
private static void replicateState(AbstractContext abstractContext) throws ClusteringFault {
@@ -160,7 +161,7 @@
Long tts =
(Long) configCtx.getPropertyNonReplicable(ClusteringConstants.TIME_TO_SEND);
if (tts == null) {
- Thread.sleep(40);
+ Thread.sleep(5);
} else if (tts.longValue() >= 0) {
Thread.sleep(tts.longValue() + 5); // Time to recv ACK + time in queue & processing replication request
}
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java?view=diff&rev=549095&r1=549094&r2=549095
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java Wed Jun 20 05:53:56 2007
@@ -21,7 +21,6 @@
import org.apache.axis2.clustering.ClusterManager;
import org.apache.axis2.clustering.context.Replicator;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -129,13 +128,12 @@
addPropertyDifference(key);
}
- private void addPropertyDifference(String key) {
+ private synchronized void addPropertyDifference(String key) {
// Add the property differences only if Context replication is enabled,
// and there are members in the cluster
ClusterManager clusterManager = getRootContext().getAxisConfiguration().getClusterManager();
if (clusterManager != null &&
- clusterManager.getContextManager() != null &&
- clusterManager.getMemberCount() != 0) {
+ clusterManager.getContextManager() != null) {
propertyDifferences.put(key, new PropertyDifference(key, false));
}
}
@@ -160,7 +158,7 @@
*
* @param key
*/
- public void removeProperty(String key) {
+ public synchronized void removeProperty(String key) {
if (properties != null) {
properties.remove(key);
}
@@ -174,7 +172,7 @@
*
* @param key
*/
- public void removePropertyNonReplicable(String key) {
+ public synchronized void removePropertyNonReplicable(String key) {
if (properties != null) {
properties.remove(key);
}
@@ -186,8 +184,8 @@
*
* @return The property differences
*/
- public Map getPropertyDifferences() {
- return Collections.unmodifiableMap(propertyDifferences);
+ public synchronized Map getPropertyDifferences() {
+ return propertyDifferences;
}
/**
@@ -195,7 +193,7 @@
* it should call this method to avoid retransmitting stuff that has already
* been sent.
*/
- public void clearPropertyDifferences() {
+ public synchronized void clearPropertyDifferences() {
propertyDifferences.clear();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org