You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2007/06/20 14:53:57 UTC

svn commit: r549095 - in /webservices/axis2/trunk/java/modules: clustering/src/org/apache/axis2/clustering/context/ clustering/src/org/apache/axis2/clustering/tribes/ kernel/src/org/apache/axis2/clustering/ kernel/src/org/apache/axis2/clustering/contex...

Author: azeez
Date: Wed Jun 20 05:53:56 2007
New Revision: 549095

URL: http://svn.apache.org/viewvc?view=rev&rev=549095
Log:
Fixing some synchronization issues with clustering.


Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java?view=diff&rev=549095&r1=549094&r2=549095
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java Wed Jun 20 05:53:56 2007
@@ -107,8 +107,9 @@
             }
         }
 
-        //TODO: This is causing a concurrency issue. Need to look into this
-        context.clearPropertyDifferences(); // Once we send the diffs, we should clear the diffs
+        synchronized (context) {
+            context.clearPropertyDifferences(); // Once we send the diffs, we should clear the diffs
+        }
         return cmd;
     }
 
@@ -124,45 +125,40 @@
                                        Map excludedPropertyPatterns,
                                        boolean includeAllProperties) {
         if (!includeAllProperties) {
+            synchronized (context) {
+                Map diffs = context.getPropertyDifferences();
+                for (Iterator iter = diffs.keySet().iterator(); iter.hasNext();) {
+                    String key = (String) iter.next();
+                    Object prop = context.getPropertyNonReplicable(key);
 
-            // Sometimes, there can be failures, so if an exception occurs, we retry
-            // TODO: Need to investigate these failures. Looks like this is becuase the contexts are not synchronized
-//            while (true) {
-            Map diffs = context.getPropertyDifferences();
-//                try {
-            for (Iterator iter = diffs.keySet().iterator(); iter.hasNext();) {
-                String key = (String) iter.next();
-                Object prop = context.getPropertyNonReplicable(key);
+                    // First check whether it is serializable
+                    if (prop instanceof Serializable) {
 
-                // First check whether it is serializable
-                if (prop instanceof Serializable) {
-
-                    // Next check whether it matches an excluded pattern
-                    if (!isExcluded(key,
-                                    context.getClass().getName(),
-                                    excludedPropertyPatterns)) {
-                        log.debug("sending property =" + key + "-" + prop);
-                        PropertyDifference diff = (PropertyDifference) diffs.get(key);
-                        diff.setValue(prop);
-                        updateCmd.addProperty(diff);
+                        // Next check whether it matches an excluded pattern
+                        if (!isExcluded(key,
+                                        context.getClass().getName(),
+                                        excludedPropertyPatterns)) {
+                            log.debug("sending property =" + key + "-" + prop);
+                            PropertyDifference diff = (PropertyDifference) diffs.get(key);
+                            diff.setValue(prop);
+                            updateCmd.addProperty(diff);
+                        }
                     }
                 }
             }
-//                    break;
-//                } catch (Exception ignored) {
-//                }
-//            }
         } else {
-            for (Iterator iter = context.getPropertyNames(); iter.hasNext();) {
-                String key = (String) iter.next();
-                Object prop = context.getPropertyNonReplicable(key);
-                if (prop instanceof Serializable) { // First check whether it is serializable
+            synchronized (context) {
+                for (Iterator iter = context.getPropertyNames(); iter.hasNext();) {
+                    String key = (String) iter.next();
+                    Object prop = context.getPropertyNonReplicable(key);
+                    if (prop instanceof Serializable) { // First check whether it is serializable
 
-                    // Next check whether it matches an excluded pattern
-                    if (!isExcluded(key, context.getClass().getName(), excludedPropertyPatterns)) {
-                        log.debug("sending property =" + key + "-" + prop);
-                        PropertyDifference diff = new PropertyDifference(key, prop, false);
-                        updateCmd.addProperty(diff);
+                        // Next check whether it matches an excluded pattern
+                        if (!isExcluded(key, context.getClass().getName(), excludedPropertyPatterns)) {
+                            log.debug("sending property =" + key + "-" + prop);
+                            PropertyDifference diff = new PropertyDifference(key, prop, false);
+                            updateCmd.addProperty(diff);
+                        }
                     }
                 }
             }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?view=diff&rev=549095&r1=549094&r2=549095
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Wed Jun 20 05:53:56 2007
@@ -31,7 +31,6 @@
 import org.apache.catalina.tribes.Member;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import sun.misc.Queue;
 
 import java.io.Serializable;
 
@@ -43,17 +42,6 @@
     private TribesControlCommandProcessor controlCommandProcessor;
     private ChannelSender channelSender;
 
-    /**
-     * The messages received are enqued. Another thread, messageProcessor, will
-     * process these messages in the order that they were received.
-     */
-    private final Queue cmdQueue = new Queue();
-
-    /**
-     * The thread which picks up messages from the cmdQueue and processes them.
-     */
-    private Thread messageProcessor;
-
     private ConfigurationContext configurationContext;
 
     public ChannelListener(ConfigurationContext configurationContext,
@@ -66,7 +54,6 @@
         this.controlCommandProcessor = controlCommandProcessor;
         this.channelSender = sender;
         this.configurationContext = configurationContext;
-//        startMessageProcessor();
     }
 
     public void setContextManager(DefaultContextManager contextManager) {
@@ -95,41 +82,13 @@
             return;
         }
         log.debug("RECEIVED MESSAGE " + msg + " from " + TribesUtil.getHost(sender));
-
-        // Need to process ACKs as soon as they are received since otherwise,
-        // unnecessary retransmissions will take place
-        /*if (msg instanceof AckCommand) {
-            try {
-                controlCommandProcessor.process((AckCommand) msg, sender);
-            } catch (Exception e) {
-                log.error(e);
-            }
-            return;
-        }*/
-
         try {
             processMessage(msg,sender);
         } catch (Exception e) {
             log.error(e);
         }
-
-        // Add the commands to be precessed to the cmdQueue
-        /*synchronized (cmdQueue) {
-            cmdQueue.enqueue(new MemberMessage(msg, sender));
-        }
-        if (!messageProcessor.isAlive()) {
-            startMessageProcessor();
-        }*/
-    }
-
-    private void startMessageProcessor() {
-        messageProcessor = new Thread(new MessageProcessor(), "ClusteringInComingMessageProcessor");
-        messageProcessor.setDaemon(true);
-        messageProcessor.setPriority(Thread.MAX_PRIORITY);
-        messageProcessor.start();
     }
 
-
     private void processMessage(Serializable msg, Member sender) throws ClusteringFault {
         if (msg instanceof ContextClusteringCommand && contextManager != null) {
             ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg;
@@ -150,49 +109,6 @@
         } else if (msg instanceof ControlCommand && controlCommandProcessor != null) {
             controlCommandProcessor.process((ControlCommand) msg,
                                             sender);
-        }
-    }
-
-    /**
-     * A container to hold a message and its sender
-     */
-    private class MemberMessage {
-        private Serializable message;
-        private Member sender;
-
-        public MemberMessage(Serializable msg, Member sender) {
-            this.message = msg;
-            this.sender = sender;
-        }
-
-        public Serializable getMessage() {
-            return message;
-        }
-
-        public Member getSender() {
-            return sender;
-        }
-    }
-
-    /**
-     * A processor which continuously polls for messages in the cmdQueue and processes them
-     */
-    private class MessageProcessor implements Runnable {
-        public void run() {
-            while (true) {
-                MemberMessage memberMessage = null;
-                try {
-                    if (!cmdQueue.isEmpty()) {
-                        memberMessage = (MemberMessage) cmdQueue.dequeue();
-                    } else {
-                        Thread.sleep(1);
-                        continue;
-                    }
-                    processMessage(memberMessage.getMessage(), memberMessage.getSender());
-                } catch (Throwable e) {
-                    log.error("Could not process message ", e);
-                }
-            }
         }
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?view=diff&rev=549095&r1=549094&r2=549095
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Wed Jun 20 05:53:56 2007
@@ -36,6 +36,7 @@
 
         // Keep retrying, since at the point of trying to send the msg, a member may leave the group
         // causing a view change. All nodes in a view should get the msg
+        //TODO: Sometimes Tribes ncorrectly detects that a member has left a group
         while (true) {
             if (channel.getMembers().length > 0) {
                 try {

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?view=diff&rev=549095&r1=549094&r2=549095
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Wed Jun 20 05:53:56 2007
@@ -116,9 +116,10 @@
             mcastProps.setProperty("tcpListenPort", "4000");
             mcastProps.setProperty("tcpListenHost", "127.0.0.1");*/
 
-//            TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
-//            tcpFailureDetector.setPrevious(nbc);
-//            channel.addInterceptor(tcpFailureDetector);
+            /*TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
+            tcpFailureDetector.setPrevious(nbc);
+            channel.addInterceptor(tcpFailureDetector);
+            tcpFailureDetector.*/
 
             channel.addChannelListener(channelListener);
             TribesMembershipListener membershipListener = new TribesMembershipListener();
@@ -241,12 +242,5 @@
         if (channelListener != null) {
             channelListener.setConfigurationContext(configurationContext);
         }
-    }
-
-    public int getMemberCount() {
-        if (channel != null) {
-            return channel.getMembers().length;
-        }
-        return 0;
     }
 }

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java?view=diff&rev=549095&r1=549094&r2=549095
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java Wed Jun 20 05:53:56 2007
@@ -67,10 +67,4 @@
      */
     void setConfigurationContext(ConfigurationContext configurationContext);
 
-    /**
-     * Get the total number of members in the cluster
-     * 
-     * @return The total number of members in the cluster
-     */
-    int getMemberCount();
 }

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java?view=diff&rev=549095&r1=549094&r2=549095
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java Wed Jun 20 05:53:56 2007
@@ -44,6 +44,7 @@
             replicateState(msgContext);
         } catch (Exception e) {
             String message = "Could not replicate the state";
+            log.error(message, e);
             throw new ClusteringFault(message, e);
         }
     }
@@ -57,6 +58,7 @@
             replicateState(abstractContext);
         } catch (Exception e) {
             String message = "Could not replicate the state";
+            log.error(message, e);
             throw new ClusteringFault(message, e);
         }
     }
@@ -73,8 +75,7 @@
         ClusterManager clusterManager =
                 abstractContext.getRootContext().getAxisConfiguration().getClusterManager();
         return clusterManager != null &&
-               clusterManager.getContextManager() != null &&
-               clusterManager.getMemberCount() != 0;
+               clusterManager.getContextManager() != null;
     }
 
     private static void replicateState(AbstractContext abstractContext) throws ClusteringFault {
@@ -160,7 +161,7 @@
                 Long tts =
                         (Long) configCtx.getPropertyNonReplicable(ClusteringConstants.TIME_TO_SEND);
                 if (tts == null) {
-                    Thread.sleep(40);
+                    Thread.sleep(5);
                 } else if (tts.longValue() >= 0) {
                     Thread.sleep(tts.longValue() + 5); // Time to recv ACK + time in queue & processing replication request
                 }

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java?view=diff&rev=549095&r1=549094&r2=549095
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java Wed Jun 20 05:53:56 2007
@@ -21,7 +21,6 @@
 import org.apache.axis2.clustering.ClusterManager;
 import org.apache.axis2.clustering.context.Replicator;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -129,13 +128,12 @@
         addPropertyDifference(key);
     }
 
-    private void addPropertyDifference(String key) {
+    private synchronized void addPropertyDifference(String key) {
         // Add the property differences only if Context replication is enabled,
         // and there are members in the cluster
         ClusterManager clusterManager = getRootContext().getAxisConfiguration().getClusterManager();
         if (clusterManager != null &&
-            clusterManager.getContextManager() != null &&
-            clusterManager.getMemberCount() != 0) {
+            clusterManager.getContextManager() != null) {
             propertyDifferences.put(key, new PropertyDifference(key, false));
         }
     }
@@ -160,7 +158,7 @@
      *
      * @param key
      */
-    public void removeProperty(String key) {
+    public synchronized void removeProperty(String key) {
         if (properties != null) {
             properties.remove(key);
         }
@@ -174,7 +172,7 @@
      *
      * @param key
      */
-    public void removePropertyNonReplicable(String key) {
+    public synchronized void removePropertyNonReplicable(String key) {
         if (properties != null) {
             properties.remove(key);
         }
@@ -186,8 +184,8 @@
      *
      * @return The property differences
      */
-    public Map getPropertyDifferences() {
-        return Collections.unmodifiableMap(propertyDifferences);
+    public synchronized Map getPropertyDifferences() {
+        return propertyDifferences;
     }
 
     /**
@@ -195,7 +193,7 @@
      * it should call this method to avoid retransmitting stuff that has already
      * been sent.
      */
-    public void clearPropertyDifferences() {
+    public synchronized void clearPropertyDifferences() {
         propertyDifferences.clear();
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org