You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2008/01/11 18:17:24 UTC

svn commit: r611239 - in /webservices/axis2/trunk/java/modules: clustering/src/org/apache/axis2/clustering/context/ clustering/src/org/apache/axis2/clustering/context/commands/ clustering/src/org/apache/axis2/clustering/control/ clustering/src/org/apac...

Author: azeez
Date: Fri Jan 11 09:17:19 2008
New Revision: 611239

URL: http://svn.apache.org/viewvc?rev=611239&view=rev
Log:
1. Moving the ACK mechanism down to the Tribes layer - this leads to reduction in the number of messages we have to send across
2. Introducing an AtMostOnceInterceptor so that duplicate messages can be handled at the TRibes layer itself - no need to deserialize the message to find out whether it's a duplicate. Can use the 
uuid appended by Tribes


Added:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
Removed:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/AckCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java
    webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java
    webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ObjectSerializationTest.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java Fri Jan 11 09:17:19 2008
@@ -26,7 +26,6 @@
 import org.apache.axis2.clustering.context.commands.UpdateContextCommand;
 import org.apache.axis2.clustering.context.commands.UpdateServiceContextCommand;
 import org.apache.axis2.clustering.context.commands.UpdateServiceGroupContextCommand;
-import org.apache.axis2.clustering.tribes.AckManager;
 import org.apache.axis2.context.AbstractContext;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.PropertyDifference;
@@ -64,8 +63,6 @@
                 commands.add(cmd);
             }
         }
-        collection.setUniqueId(UUIDGenerator.getUUID());
-        AckManager.addInitialAcknowledgement(collection);
         return collection;
     }
 
@@ -82,15 +79,12 @@
 
         UpdateContextCommand cmd = toUpdateContextCommand(context);
         if (cmd != null) {
-            cmd.setUniqueId(UUIDGenerator.getUUID());
             fillProperties(cmd,
                            context,
                            excludedPropertyPatterns,
                            includeAllProperties);
             if (cmd.isPropertiesEmpty()) {
                 cmd = null;
-            } else {
-                AckManager.addInitialAcknowledgement(cmd);
             }
         }
 
@@ -107,13 +101,10 @@
 
         UpdateContextCommand cmd = toUpdateContextCommand(context);
         if (cmd != null) {
-            cmd.setUniqueId(UUIDGenerator.getUUID());
             fillProperties(cmd, context, propertyNames);
             if (cmd.isPropertiesEmpty()) {
                 cmd = null;
-            } else {
-                AckManager.addInitialAcknowledgement(cmd);
-            }
+            } 
         }
 
         synchronized (context) {
@@ -272,7 +263,6 @@
         if (abstractContext instanceof ServiceGroupContext) {
             ServiceGroupContext sgCtx = (ServiceGroupContext) abstractContext;
             DeleteServiceGroupContextCommand cmd = new DeleteServiceGroupContextCommand();
-            cmd.setUniqueId(UUIDGenerator.getUUID());
             cmd.setServiceGroupContextId(sgCtx.getId());
 
             return cmd;

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java Fri Jan 11 09:17:19 2008
@@ -22,9 +22,7 @@
 import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.ClusteringConstants;
 import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection;
-import org.apache.axis2.clustering.tribes.AckManager;
 import org.apache.axis2.clustering.tribes.ChannelSender;
 import org.apache.axis2.context.AbstractContext;
 import org.apache.axis2.context.ConfigurationContext;
@@ -32,7 +30,11 @@
 import org.apache.axis2.context.ServiceGroupContext;
 import org.apache.axis2.description.Parameter;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 public class DefaultContextManager implements ContextManager {
 
@@ -42,7 +44,6 @@
     private Map parameters = new HashMap();
 
     private ChannelSender sender;
-    private ContextReplicationProcessor processor = new ContextReplicationProcessor();
 
     private Map excludedReplicationPatterns = new HashMap();
 
@@ -54,41 +55,35 @@
     public DefaultContextManager() {
     }
 
-    public String updateContext(AbstractContext context) throws ClusteringFault {
+    public void updateContext(AbstractContext context) throws ClusteringFault {
         ContextClusteringCommand cmd =
                 ContextClusteringCommandFactory.getUpdateCommand(context,
                                                                  excludedReplicationPatterns,
                                                                  false);
         if (cmd != null) {
-            processor.process(cmd);
-            return cmd.getUniqueId();
+            sender.sendToGroup(cmd);
         }
-        return null;
     }
 
-    public String updateContext(AbstractContext context,
+    public void updateContext(AbstractContext context,
                                 String[] propertyNames) throws ClusteringFault {
         ContextClusteringCommand cmd =
                 ContextClusteringCommandFactory.getUpdateCommand(context, propertyNames);
         if (cmd != null) {
-            processor.process(cmd);
-            return cmd.getUniqueId();
+            sender.sendToGroup(cmd);
         }
-        return null;
     }
 
-    public String updateContexts(AbstractContext[] contexts) throws ClusteringFault {
+    public void updateContexts(AbstractContext[] contexts) throws ClusteringFault {
         ContextClusteringCommandCollection cmd =
                 ContextClusteringCommandFactory.getCommandCollection(contexts,
                                                                      excludedReplicationPatterns);
-        processor.process(cmd);
-        return cmd.getUniqueId();
+        sender.sendToGroup(cmd);
     }
 
-    public String removeContext(AbstractContext context) throws ClusteringFault {
+    public void removeContext(AbstractContext context) throws ClusteringFault {
         ContextClusteringCommand cmd = ContextClusteringCommandFactory.getRemoveCommand(context);
-        processor.process(cmd);
-        return cmd.getUniqueId();
+        sender.sendToGroup(cmd);
     }
 
     public boolean isContextClusterable(AbstractContext context) {
@@ -97,14 +92,6 @@
                (context instanceof ServiceGroupContext);
     }
 
-    public boolean isMessageAcknowledged(String messageUniqueId) throws ClusteringFault {
-        return AckManager.isMessageAcknowledged(messageUniqueId, sender);
-    }
-
-    public void process(ContextClusteringCommand command) throws ClusteringFault {
-        command.execute(configContext);
-    }
-
     public void setContextManagerListener(ContextManagerListener listener) {
         this.listener = listener;
         if (configContext != null) {
@@ -114,7 +101,7 @@
 
     public void setConfigurationContext(ConfigurationContext configurationContext) {
         this.configContext = configurationContext;
-        if(listener != null){
+        if (listener != null) {
             listener.setConfigurationContext(configContext);
         }
     }
@@ -156,43 +143,4 @@
         throw new UnsupportedOperationException();
     }
     // ---------------------------------------------------------------------------------------------
-
-    private class ContextReplicationProcessor {
-        public void process(final ContextClusteringCommand cmd) throws ClusteringFault {
-
-            // If the sender is NULL, it means the TribesClusterManager is still being initialized
-            // So we need to busy wait.
-            if (sender == null) {
-                Thread processorThread = new Thread("ProcessorThread") {
-                    public void run() {
-                        do {
-                            try {
-                                Thread.sleep(300);
-                            } catch (InterruptedException e) {
-                                e.printStackTrace();
-                            }
-                        } while (sender == null);
-                        try {
-                            long tts = sender.sendToGroup(cmd);
-                            configContext.setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
-                                                                   new Long(tts));
-                        } catch (ClusteringFault clusteringFault) {
-                            AckManager.removeMessage(cmd.getUniqueId());
-                            throw new RuntimeException(clusteringFault);
-                        }
-                    }
-                };
-                processorThread.start();
-            } else {
-                try {
-                    long tts = sender.sendToGroup(cmd);
-                    configContext.setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
-                                                           new Long(tts));
-                } catch (ClusteringFault clusteringFault) {
-                    AckManager.removeMessage(cmd.getUniqueId());
-                    throw clusteringFault;
-                }
-            }
-        }
-    }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java Fri Jan 11 09:17:19 2008
@@ -45,6 +45,6 @@
     }
 
     public String toString() {
-        return "ContextClusteringCommandCollection(" + uniqueId + ")";
+        return "ContextClusteringCommandCollection";
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java Fri Jan 11 09:17:19 2008
@@ -31,6 +31,6 @@
     }
 
     public String toString() {
-        return "UpdateConfigurationContextCommand(" + uniqueId + ")";
+        return "UpdateConfigurationContextCommand";
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java Fri Jan 11 09:17:19 2008
@@ -102,6 +102,6 @@
     }
 
     public String toString() {
-        return "UpdateServiceContextCommand(" + uniqueId + ")";
+        return "UpdateServiceContextCommand";
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java Fri Jan 11 09:17:19 2008
@@ -73,6 +73,6 @@
     }
 
     public String toString() {
-        return "UpdateServiceGroupContextCommand(" + uniqueId + ")";
+        return "UpdateServiceGroupContextCommand";
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java Fri Jan 11 09:17:19 2008
@@ -44,8 +44,10 @@
 
         // Run this code only if this node is not already initialized
         if (configContext.
-                getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+                getPropertyNonReplicable(ClusteringConstants.RECD_CONFIG_INIT_MSG) == null) {
             log.info("Received configuration initialization message");
+            configContext.
+                setNonReplicableProperty(ClusteringConstants.RECD_CONFIG_INIT_MSG, "true");
             if (serviceGroups != null) {
 
                 // Load all the service groups that are sent by the neighbour

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java Fri Jan 11 09:17:19 2008
@@ -34,17 +34,17 @@
 
     private ContextClusteringCommand[] commands;
 
-    public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+    public void execute(ConfigurationContext configContext) throws ClusteringFault {
 
         // Run this code only if this node is not already initialized
-        if (configurationContext.
-                getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+        if (configContext.
+                getPropertyNonReplicable(ClusteringConstants.RECD_STATE_INIT_MSG) == null) {
+            configContext.
+                setNonReplicableProperty(ClusteringConstants.RECD_STATE_INIT_MSG, "true");
             log.info("Received state initialization message");
-            configurationContext.
-                    setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true");
             if (commands != null) {
                 for (int i = 0; i < commands.length; i++) {
-                    commands[i].execute(configurationContext);
+                    commands[i].execute(configContext);
                 }
             }
         }

Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java?rev=611239&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java Fri Jan 11 09:17:19 2008
@@ -0,0 +1,122 @@
+/*                                                                             
+ * Copyright 2004,2005 The Apache Software Foundation.                         
+ *                                                                             
+ * Licensed under the Apache License, Version 2.0 (the "License");             
+ * you may not use this file except in compliance with the License.            
+ * You may obtain a copy of the License at                                     
+ *                                                                             
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
+ *                                                                             
+ * Unless required by applicable law or agreed to in writing, software         
+ * distributed under the License is distributed on an "AS IS" BASIS,           
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
+ * See the License for the specific language governing permissions and         
+ * limitations under the License.                                              
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ByteMessage;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.membership.Membership;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.description.AxisModule;
+
+import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.Timer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
+import java.net.Socket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.ConnectException;
+import java.io.Serializable;
+
+/**
+ * Message intereceptor for handling at-most-once message processing semantics
+ */
+public class AtMostOnceInterceptor extends ChannelInterceptorBase {
+
+    private static Log log = LogFactory.getLog(AtMostOnceInterceptor.class);
+    private static final Map receivedMessages = new HashMap();
+
+    /**
+     * The time a message lives in the receivedMessages Map
+     */
+    private static final int TIMEOUT = 5 * 60 * 1000;
+
+    private Channel channel;
+
+
+    public AtMostOnceInterceptor(Channel channel) {
+        this();
+        this.channel = channel;
+    }
+
+    public AtMostOnceInterceptor() {
+
+        TimerTask cleanupTask = new TimerTask() {
+            public void run() {
+                List toBeRemoved = new ArrayList();
+                for (Iterator iterator = receivedMessages.keySet().iterator();
+                     iterator.hasNext();) {
+                    ChannelMessage msg = (ChannelMessage) iterator.next();
+                    long arrivalTime = ((Long) receivedMessages.get(msg)).longValue();
+                    if (System.currentTimeMillis() - arrivalTime >= TIMEOUT) {
+                        toBeRemoved.add(msg);
+                    }
+                }
+                for (Iterator iterator = toBeRemoved.iterator(); iterator.hasNext();) {
+                    ChannelMessage msg = (ChannelMessage) iterator.next();
+                    receivedMessages.remove(msg);
+                }
+            }
+        };
+        new Timer().scheduleAtFixedRate(cleanupTask, TIMEOUT, TIMEOUT);
+    }
+
+    public void messageReceived(ChannelMessage msg) {
+        super.messageReceived(msg);
+        if (receivedMessages.get(msg) == null) {  // If it is a new message, keep track of it
+            /*XByteBuffer message1 = msg.getMessage();
+
+
+            try {
+                List classLoaders = new ArrayList();
+                classLoaders.add(AtMostOnceInterceptor.class.getClassLoader());
+                Serializable msg2 = XByteBuffer.deserialize(message1.getBytes(),
+                                                            0,
+                                                            message1.getBytes().length,
+                                                            (ClassLoader[]) classLoaders.toArray(new ClassLoader[classLoaders.size()]));
+                log.debug("###### added new msg " + TribesUtil.getLocalHost(channel) + " msg2=" + msg2);
+            } catch (Exception e) {
+                log.error("Cannot deserialize received message", e);
+                return;
+            }*/
+
+
+            receivedMessages.put(msg, new Long(System.currentTimeMillis()));
+            super.messageReceived(msg);
+        } else {  // If it is a duplicate message, discard it. i.e. dont call super.messageReceived
+            log.info("Duplicate message received from " + TribesUtil.getHost(msg.getAddress()));
+        }
+    }
+}

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Fri Jan 11 09:17:19 2008
@@ -25,9 +25,6 @@
 import org.apache.axis2.clustering.configuration.DefaultConfigurationManager;
 import org.apache.axis2.clustering.context.ContextClusteringCommand;
 import org.apache.axis2.clustering.context.DefaultContextManager;
-import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection;
-import org.apache.axis2.clustering.context.commands.UpdateContextCommand;
-import org.apache.axis2.clustering.control.AckCommand;
 import org.apache.axis2.clustering.control.ControlCommand;
 import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
 import org.apache.axis2.clustering.control.GetStateResponseCommand;
@@ -37,54 +34,33 @@
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.catalina.tribes.ByteMessage;
 import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
 import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 
 public class ChannelListener implements org.apache.catalina.tribes.ChannelListener {
     private static final Log log = LogFactory.getLog(ChannelListener.class);
 
-    /**
-     * The time a message lives in the receivedMessages Map
-     */
-    private static final int TIME_TO_LIVE = 5 * 60 * 1000; // 5 mins
-
     private DefaultContextManager contextManager;
     private DefaultConfigurationManager configurationManager;
     private TribesControlCommandProcessor controlCommandProcessor;
-    private ChannelSender channelSender;
 
     private ConfigurationContext configurationContext;
-    private boolean synchronizeAllMembers;
-
-    private Map receivedMessages = new HashMap();
 
     public ChannelListener(ConfigurationContext configurationContext,
                            DefaultConfigurationManager configurationManager,
                            DefaultContextManager contextManager,
-                           TribesControlCommandProcessor controlCommandProcessor,
-                           ChannelSender sender,
-                           boolean synchronizeAllMembers) {
+                           TribesControlCommandProcessor controlCommandProcessor) {
         this.configurationManager = configurationManager;
         this.contextManager = contextManager;
         this.controlCommandProcessor = controlCommandProcessor;
-        this.channelSender = sender;
         this.configurationContext = configurationContext;
-        this.synchronizeAllMembers = synchronizeAllMembers;
-
-        Timer cleanupTimer = new Timer();
-        cleanupTimer.scheduleAtFixedRate(new ReceivedMessageCleanupTask(),
-                                         TIME_TO_LIVE,
-                                         TIME_TO_LIVE);
     }
 
     public void setContextManager(DefaultContextManager contextManager) {
@@ -106,8 +82,7 @@
      * @return boolean
      */
     public boolean accept(Serializable msg, Member sender) {
-        return configurationContext.
-                getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) != null;
+        return true;
     }
 
     /**
@@ -137,8 +112,9 @@
                                           message.length,
                                           (ClassLoader[]) classLoaders.toArray(new ClassLoader[classLoaders.size()]));
         } catch (Exception e) {
-            log.error("Cannot deserialize received message", e);
-            return;
+            String errMsg = "Cannot deserialize received message";
+            log.error(errMsg, e);
+            throw new RemoteProcessException(errMsg, e);
         }
 
         // If the system has not still been intialized, reject all incoming messages, except the
@@ -160,70 +136,20 @@
         try {
             processMessage(msg, sender);
         } catch (Exception e) {
-            log.error("Cannot process message", e);
+            String errMsg = "Cannot process received message";
+            log.error(errMsg, e);
+            throw new RemoteProcessException(errMsg, e);
         }
     }
 
     private void processMessage(Serializable msg, Member sender) throws ClusteringFault {
-        //TODO: Handle ACK implosion?
-
         if (msg instanceof ContextClusteringCommand && contextManager != null) {
             ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg;
-            String msgId = ctxCmd.getUniqueId();
-
-            // Check for duplicate messages and ignore duplicates in order to support at-most-once semantics
-            if (receivedMessages.containsKey(msgId)) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Received duplicate message " + ctxCmd);
-                }
-                return;
-            }
-            synchronized (receivedMessages) {
-                receivedMessages.put(msgId, new Long(System.currentTimeMillis()));// Let's keep track of the message as well as the time at which it was first received
-            }
-
-            // Process the message
-            contextManager.process(ctxCmd);
-
-            // Sending ACKs for ContextClusteringCommandCollection or
-            // UpdateContextCommand is sufficient
-            if (synchronizeAllMembers) { // Send ACK only if the relevant cluster config parameter is set
-                if (msg instanceof ContextClusteringCommandCollection ||
-                    msg instanceof UpdateContextCommand) {
-                    AckCommand ackCmd = new AckCommand(msgId);
-
-                    // Send the ACK
-                    this.channelSender.sendToMember(ackCmd, sender);
-                }
-            }
-        } else if (msg instanceof ConfigurationClusteringCommand &&
-                   configurationManager != null) {
+            ctxCmd.execute(configurationContext);
+        } else if (msg instanceof ConfigurationClusteringCommand && configurationManager != null) {
             configurationManager.process((ConfigurationClusteringCommand) msg);
         } else if (msg instanceof ControlCommand && controlCommandProcessor != null) {
             controlCommandProcessor.process((ControlCommand) msg, sender);
-        }
-    }
-
-    private class ReceivedMessageCleanupTask extends TimerTask {
-
-        public void run() {
-            List toBeRemoved = new ArrayList();
-            synchronized (receivedMessages) {
-                for (Iterator iter = receivedMessages.keySet().iterator(); iter.hasNext();) {
-                    String msgId = (String) iter.next();
-                    Long recdTime = (Long) receivedMessages.get(msgId);
-                    if (System.currentTimeMillis() - recdTime.longValue() >= TIME_TO_LIVE) {
-                        toBeRemoved.add(msgId);
-                    }
-                }
-                for (Iterator iter = toBeRemoved.iterator(); iter.hasNext();) {
-                    String msgId = (String) iter.next();
-                    receivedMessages.remove(msgId);
-                    if (log.isDebugEnabled()) {
-                        log.debug("Removed message " + msgId + " from received message buffer");
-                    }
-                }
-            }
         }
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Fri Jan 11 09:17:19 2008
@@ -39,23 +39,21 @@
     private Log log = LogFactory.getLog(ChannelSender.class);
     private Channel channel;
 
-    public long sendToGroup(ClusteringCommand msg) throws ClusteringFault {
+    public void sendToGroup(ClusteringCommand msg) throws ClusteringFault {
         if (channel == null) {
-            return 0;
+            return;
         }
-        long timeToSend = 0;
-
         Member[] members = MembershipManager.getMembers();
 
         // Keep retrying, since at the point of trying to send the msg, a member may leave the group
         // causing a view change. All nodes in a view should get the msg
         if (members.length > 0) {
             try {
-                long start = System.currentTimeMillis();
-                channel.send(members, toByteMessage(msg), Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
-//                channel.send(members, toByteMessage(msg), Channel.SEND_OPTIONS_USE_ACK);
-                timeToSend = System.currentTimeMillis() - start;
-                log.debug("Sent " + msg + " to group");
+                channel.send(members, toByteMessage(msg),
+                             Channel.SEND_OPTIONS_USE_ACK | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+                if (log.isDebugEnabled()) {
+                    log.debug("Sent " + msg + " to group");
+                }
             } catch (NotSerializableException e) {
                 String message = "Could not send command message " + msg +
                                  " to group since it is not serializable.";
@@ -76,7 +74,6 @@
                 log.warn(message, e);
             }
         }
-        return timeToSend;
     }
 
     private ByteMessage toByteMessage(ClusteringCommand msg) throws IOException {
@@ -96,21 +93,22 @@
             channel.send(new Member[]{channel.getLocalMember(true)},
                          toByteMessage(msg),
                          Channel.SEND_OPTIONS_USE_ACK);
-            log.debug("Sent " + msg + " to self");
+            if (log.isDebugEnabled()) {
+                log.debug("Sent " + msg + " to self");
+            }
         } catch (Exception e) {
             throw new ClusteringFault(e);
         }
     }
 
-    public long sendToMember(ClusteringCommand cmd, Member member) throws ClusteringFault {
-        long timeToSend = 0;
+    public void sendToMember(ClusteringCommand cmd, Member member) throws ClusteringFault {
         try {
             if (member.isReady()) {
-                long start = System.currentTimeMillis();
-                channel.send(new Member[]{member}, toByteMessage(cmd), Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
-//                channel.send(new Member[]{member}, toByteMessage(cmd), Channel.SEND_OPTIONS_USE_ACK);
-                timeToSend = System.currentTimeMillis() - start;
-                log.debug("Sent " + cmd + " to " + TribesUtil.getHost(member));
+                channel.send(new Member[]{member}, toByteMessage(cmd),
+                             Channel.SEND_OPTIONS_USE_ACK | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+                if (log.isDebugEnabled()) {
+                    log.debug("Sent " + cmd + " to " + TribesUtil.getHost(member));
+                }
             }
         } catch (NotSerializableException e) {
             String message = "Could not send command message to " + TribesUtil.getHost(member) +
@@ -130,7 +128,6 @@
                              ". Reason " + e.getMessage();
             log.warn(message, e);
         }
-        return timeToSend;
     }
 
     public Channel getChannel() {

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Fri Jan 11 09:17:19 2008
@@ -47,7 +47,6 @@
 import org.apache.catalina.tribes.group.GroupChannel;
 import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
 import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
-import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
 import org.apache.catalina.tribes.transport.ReceiverBase;
 import org.apache.catalina.tribes.transport.ReplicationTransmitter;
 import org.apache.commons.logging.Log;
@@ -128,9 +127,7 @@
         channelListener = new ChannelListener(configurationContext,
                                               configurationManager,
                                               contextManager,
-                                              controlCmdProcessor,
-                                              channelSender,
-                                              synchronizeAllMembers());
+                                              controlCmdProcessor);
 
         controlCmdProcessor.setChannelSender(channelSender);
         channel = new GroupChannel();
@@ -159,6 +156,8 @@
         } else {
             domain = "apache.axis2.domain".getBytes();
         }
+
+        // Add a DomainFilterInterceptor
         channel.getMembershipService().setDomain(domain);
         DomainFilterInterceptor dfi = new DomainFilterInterceptor();
         dfi.setDomain(domain);
@@ -186,10 +185,17 @@
        mcastProps.setProperty("tcpListenHost", "127.0.0.1");*/
 
 //        OrderInterceptor orderInterceptor = new OrderInterceptor();
-        
 
+
+        // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
+        AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor(channel);
+        channel.addInterceptor(atMostOnceInterceptor);
+        atMostOnceInterceptor.setPrevious(dfi);
+
+        // Add a reliable failure detector
         TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
-        tcpFailureDetector.setPrevious(dfi);
+//        tcpFailureDetector.setPrevious(dfi);
+        tcpFailureDetector.setPrevious(atMostOnceInterceptor);
         channel.addInterceptor(tcpFailureDetector);
 
         channel.addChannelListener(channelListener);
@@ -211,7 +217,6 @@
         }
         channelSender.setChannel(channel);
 
-//        Member[] members = channel.getMembers();
         log.info("Local Tribes Member " + TribesUtil.getLocalHost(channel));
         TribesUtil.printMembers();
 
@@ -250,6 +255,7 @@
         List sentMembersList = new ArrayList();
         sentMembersList.add(TribesUtil.getLocalHost(channel));
         Member[] members = MembershipManager.getMembers();
+
         while (members.length > 0 &&
                configurationContext.
                        getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null
@@ -261,13 +267,10 @@
                                 MembershipManager.getLongestLivingMember() : // First try to get from the longest alive member
                                 MembershipManager.getRandomMember(); // Else get from a random member
                 if (!sentMembersList.contains(TribesUtil.getHost(member))) {
-                    long tts = sender.sendToMember(command, member);
-                    configurationContext.
-                            setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
-                                                     new Long(tts));
+                    sender.sendToMember(command, member);
                     sentMembersList.add(TribesUtil.getHost(member));
                     log.debug("WAITING FOR INITIALIZATION MESSAGE...");
-                    Thread.sleep(tts + 5 * (numberOfTries + 1));
+                    Thread.sleep(10 * (numberOfTries + 1));
                 }
             } catch (Exception e) {
                 log.error("Cannot get initialization information", e);

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java Fri Jan 11 09:17:19 2008
@@ -20,12 +20,11 @@
 
 import org.apache.axis2.clustering.ClusteringConstants;
 import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.control.AckCommand;
 import org.apache.axis2.clustering.control.ControlCommand;
-import org.apache.axis2.clustering.control.GetStateCommand;
-import org.apache.axis2.clustering.control.GetStateResponseCommand;
 import org.apache.axis2.clustering.control.GetConfigurationCommand;
 import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
+import org.apache.axis2.clustering.control.GetStateCommand;
+import org.apache.axis2.clustering.control.GetStateResponseCommand;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.catalina.tribes.Member;
 
@@ -77,10 +76,6 @@
             getConfigRespCmd.
                     setServiceGroups(((GetConfigurationCommand) command).getServiceGroupNames());
             channelSender.sendToMember(getConfigRespCmd, sender);
-        } else if (command instanceof AckCommand) {
-            AckCommand cmd = (AckCommand) command;
-            cmd.setMemberId(TribesUtil.getHost(sender));
-            cmd.execute(configurationContext);
         } else {
             command.execute(configurationContext);
         }

Modified: webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java Fri Jan 11 09:17:19 2008
@@ -31,6 +31,7 @@
 import org.apache.axis2.context.ServiceGroupContext;
 import org.apache.axis2.description.AxisService;
 import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.description.Parameter;
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.axis2.transport.http.server.HttpUtils;
 
@@ -38,12 +39,16 @@
 import java.util.List;
 
 /**
- *
+ * Tests the replication of properties placed the ConfigurationContext, ServiceGroupContext &
+ * ServiceContext
  */
 public class ContextReplicationTest extends TestCase {
 
     private static final String TEST_SERVICE_NAME = "testService";
 
+    private static final Parameter domainParam =
+            new Parameter(ClusteringConstants.DOMAIN, "axis2.domain." + UUIDGenerator.getUUID());
+
     // --------------- Cluster-1 ------------------------------------------------------
     private ClusterManager clusterManager1;
     private ContextManager ctxMan1;
@@ -73,6 +78,7 @@
         ctxMan1 = getContextManager();
         configMan1 = getConfigurationManager();
         clusterManager1 = getClusterManager(configurationContext1, ctxMan1, configMan1);
+        clusterManager1.addParameter(domainParam);
         clusterManager1.init();
         System.out.println("ClusterManager-1 successfully initialized");
 
@@ -84,6 +90,7 @@
         ctxMan2 = getContextManager();
         configMan2 = getConfigurationManager();
         clusterManager2 = getClusterManager(configurationContext2, ctxMan2, configMan2);
+        clusterManager2.addParameter(domainParam);
         clusterManager2.init();
         System.out.println("ClusterManager-2 successfully initialized");
     }
@@ -139,8 +146,6 @@
             String val1 = "configCtxVal1";
             configurationContext1.setProperty(key1, val1);
             ctxMan1.updateContext(configurationContext1);
-            Thread.sleep(1000); // Give some time for the replication to take place
-
             String value = (String) configurationContext2.getProperty(key1);
             assertEquals(val1, value);
         }
@@ -150,8 +155,6 @@
             String val2 = "configCtxVal1";
             configurationContext2.setProperty(key2, val2);
             ctxMan2.updateContext(configurationContext2);
-            Thread.sleep(1000); // Give some time for the replication to take place
-
             String value = (String) configurationContext1.getProperty(key2);
             assertEquals(val2, value);
         }
@@ -165,8 +168,6 @@
         {
             configurationContext1.setProperty(key1, val1);
             ctxMan1.updateContext(configurationContext1);
-            Thread.sleep(1000); // Give some time for the replication to take place
-
             String value = (String) configurationContext2.getProperty(key1);
             assertEquals(val1, value);
         }
@@ -174,8 +175,6 @@
         // Next remove this property from cluster 2, replicate it, and check that it is unavailable in cluster 1
         configurationContext2.removeProperty(key1);
         ctxMan2.updateContext(configurationContext2);
-        Thread.sleep(1000); // Give some time for the replication to take place
-
         String value = (String) configurationContext1.getProperty(key1);
         assertNull(configurationContext2.getProperty(key1));
         assertNull(value);
@@ -199,8 +198,6 @@
         String val1 = "sgCtxVal1";
         serviceGroupContext1.setProperty(key1, val1);
         ctxMan1.updateContext(serviceGroupContext1);
-
-        Thread.sleep(1000);
         assertEquals(val1, serviceGroupContext2.getProperty(key1));
     }
 
@@ -223,15 +220,12 @@
         String val1 = "sgCtxVal1";
         serviceGroupContext1.setProperty(key1, val1);
         ctxMan1.updateContext(serviceGroupContext1);
-
-        Thread.sleep(1000);
         assertEquals(val1, serviceGroupContext2.getProperty(key1));
 
         // Remove the property
         serviceGroupContext2.removeProperty(key1);
         assertNull(serviceGroupContext2.getProperty(key1));
         ctxMan1.updateContext(serviceGroupContext2);
-        Thread.sleep(1000);
         assertNull(serviceGroupContext1.getProperty(key1));
     }
 
@@ -255,7 +249,6 @@
         serviceGroupContext1.setProperty(key1, val1);
         ctxMan1.updateContext(serviceGroupContext1);
 
-        Thread.sleep(1000);
         assertEquals(val1, serviceGroupContext2.getProperty(key1));
     }
 
@@ -281,14 +274,12 @@
         serviceGroupContext1.setProperty(key1, val1);
         ctxMan1.updateContext(serviceGroupContext1);
 
-        Thread.sleep(1000);
         assertEquals(val1, serviceGroupContext2.getProperty(key1));
 
         // Remove the property
         serviceGroupContext2.removeProperty(key1);
         assertNull(serviceGroupContext2.getProperty(key1));
         ctxMan1.updateContext(serviceGroupContext2);
-        Thread.sleep(1000);
         assertNull(serviceGroupContext1.getProperty(key1));
     }
 
@@ -315,7 +306,6 @@
         serviceContext1.setProperty(key1, val1);
         ctxMan1.updateContext(serviceContext1);
 
-        Thread.sleep(1000);
         assertEquals(val1, serviceContext2.getProperty(key1));
     }
 
@@ -342,7 +332,6 @@
         serviceContext1.setProperty(key1, val1);
         ctxMan1.updateContext(serviceContext1);
 
-        Thread.sleep(1000);
         assertEquals(val1, serviceContext2.getProperty(key1));
     }
 
@@ -370,14 +359,12 @@
         serviceContext1.setProperty(key1, val1);
         ctxMan1.updateContext(serviceContext1);
 
-        Thread.sleep(1000);
         assertEquals(val1, serviceContext2.getProperty(key1));
 
         // Remove the property
         serviceContext2.removeProperty(key1);
         assertNull(serviceContext2.getProperty(key1));
         ctxMan1.updateContext(serviceContext2);
-        Thread.sleep(1000);
         assertNull(serviceContext1.getProperty(key1));
     }
 
@@ -405,14 +392,12 @@
         serviceContext1.setProperty(key1, val1);
         ctxMan1.updateContext(serviceContext1);
 
-        Thread.sleep(1000);
         assertEquals(val1, serviceContext2.getProperty(key1));
 
         // Remove the property
         serviceContext2.removeProperty(key1);
         assertNull(serviceContext2.getProperty(key1));
         ctxMan1.updateContext(serviceContext2);
-        Thread.sleep(1000);
         assertNull(serviceContext1.getProperty(key1));
     }
 
@@ -424,7 +409,6 @@
         exclusionPatterns.add("local_*");
         ctxMan1.setReplicationExcludePatterns("defaults", exclusionPatterns);
         ctxMan1.updateContext(configurationContext1);
-        Thread.sleep(1000); // Give some time for the replication to take place
 
         String value = (String) configurationContext2.getProperty(key1);
         assertNull(value); // The property should not have gotten replicated
@@ -439,7 +423,6 @@
         ctxMan1.setReplicationExcludePatterns("org.apache.axis2.context.ConfigurationContext",
                                               exclusionPatterns);
         ctxMan1.updateContext(configurationContext1);
-        Thread.sleep(1000); // Give some time for the replication to take place
 
         String value = (String) configurationContext2.getProperty(key1);
         assertNull(value); // The property should not have gotten replicated
@@ -467,7 +450,6 @@
         ctxMan1.setReplicationExcludePatterns("defaults",
                                               exclusionPatterns2);
         ctxMan1.updateContext(configurationContext1);
-        Thread.sleep(1000); // Give some time for the replication to take place
 
         String value1 = (String) configurationContext2.getProperty(key1);
         assertNull(value1); // The property should not have gotten replicated

Modified: webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ObjectSerializationTest.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ObjectSerializationTest.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ObjectSerializationTest.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ObjectSerializationTest.java Fri Jan 11 09:17:19 2008
@@ -16,8 +16,6 @@
 package org.apache.axis2.clustering;
 
 import junit.framework.TestCase;
-import org.apache.axis2.clustering.control.AckCommand;
-import org.apache.axis2.clustering.TestDO;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -31,17 +29,6 @@
 public class ObjectSerializationTest extends TestCase {
 
     public void testSerialization() throws IOException, ClassNotFoundException {
-        AckCommand ackCommand = new AckCommand("uuid");
-        ackCommand.setMemberId("123456");
-
-        AckCommand ackCommand2 = (AckCommand) copy(ackCommand);
-
-        assertNotNull(ackCommand2);
-        assertFalse(ackCommand.equals(ackCommand2));
-        assertEquals(ackCommand.getUniqueId(), ackCommand2.getUniqueId());
-    }
-
-    public void testSerialization2() throws IOException, ClassNotFoundException {
         TestDO testDO = new TestDO("name", "value");
         TestDO testDO2 = (TestDO) copy(testDO);
 

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java Fri Jan 11 09:17:19 2008
@@ -39,7 +39,8 @@
     public static final String NODE_MANAGER_SERVICE = "Axis2NodeManager";
     public static final String REQUEST_BLOCKING_HANDLER = "RequestBlockingHandler";
     public static final String CLUSTER_INITIALIZED = "local_cluster.initialized";
-    public static final String TIME_TO_SEND = "local_cluster.time.to.send";
+    public static final String RECD_CONFIG_INIT_MSG = "local_recd.config.init.method";
+    public static final String RECD_STATE_INIT_MSG = "local_recd.state.init.method";
     public static final String BLOCK_ALL_REQUESTS = "local_wso2wsas.block.requests";
     public static final String LOCAL_IP_ADDRESS = "axis2.local.ip.address";
 

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java Fri Jan 11 09:17:19 2008
@@ -23,7 +23,7 @@
  */
 public interface MessageSender {
 
-    public long sendToGroup(ClusteringCommand msg) throws ClusteringFault;
+    public void sendToGroup(ClusteringCommand msg) throws ClusteringFault;
 
     public void sendToSelf(ClusteringCommand msg) throws ClusteringFault;
 

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java Fri Jan 11 09:17:19 2008
@@ -25,15 +25,6 @@
 
 public abstract class ContextClusteringCommand extends ClusteringCommand {
 
-    protected String uniqueId;
-
     public abstract void execute(ConfigurationContext configContext) throws ClusteringFault;
 
-    public String getUniqueId() {
-        return uniqueId;
-    }
-
-    public void setUniqueId(String uniqueId) {
-        this.uniqueId = uniqueId;
-    }
 }

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java Fri Jan 11 09:17:19 2008
@@ -34,11 +34,10 @@
      * This could be addition of new properties, modifications of existing properties or
      * removal of properties.
      *
-     * @param context The AbstractContext containing the properties to be replicated
-     * @return The UUID of the message that was sent to the group communications framework
+     * @param context The context to be replicated
      * @throws ClusteringFault If replication fails
      */
-    String updateContext(AbstractContext context) throws ClusteringFault;
+    void updateContext(AbstractContext context) throws ClusteringFault;
 
     /**
      * This method is called when one need to update/replicate only certains properties in the
@@ -46,10 +45,9 @@
      *
      * @param context       The AbstractContext containing the properties to be replicated
      * @param propertyNames The names of the specific properties that should be replicated
-     * @return The UUID of the message that was sent to the group communications framework
      * @throws ClusteringFault If replication fails
      */
-    String updateContext(AbstractContext context, String[] propertyNames) throws ClusteringFault;
+    void updateContext(AbstractContext context, String[] propertyNames) throws ClusteringFault;
 
     /**
      * This method is called when properties in a collection of {@link AbstractContext}s are updated.
@@ -57,19 +55,17 @@
      * removal of properties.
      *
      * @param contexts The AbstractContexts containing the properties to be replicated
-     * @return The UUID of the message that was sent to the group communications framework
      * @throws ClusteringFault If replication fails
      */
-    String updateContexts(AbstractContext[] contexts) throws ClusteringFault;
+    void updateContexts(AbstractContext[] contexts) throws ClusteringFault;
 
     /**
      * This method is called when {@link AbstractContext} is removed from the system
      *
      * @param context The AbstractContext to be removed
-     * @return The UUID of the message that was sent to the group communications framework
      * @throws ClusteringFault If context removal fails
      */
-    String removeContext(AbstractContext context) throws ClusteringFault;
+    void removeContext(AbstractContext context) throws ClusteringFault;
 
     /**
      * @param context AbstractContext
@@ -85,7 +81,7 @@
      *         false - otherwise
      * @throws ClusteringFault If an error occurs while checking whether a message is ACKed
      */
-    boolean isMessageAcknowledged(String messageUniqueId) throws ClusteringFault;
+//    boolean isMessageAcknowledged(String messageUniqueId) throws ClusteringFault;
 
     /**
      * @param listener ContextManagerListener

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java Fri Jan 11 09:17:19 2008
@@ -20,7 +20,6 @@
 package org.apache.axis2.clustering.context;
 
 import org.apache.axis2.clustering.ClusterManager;
-import org.apache.axis2.clustering.ClusteringConstants;
 import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.context.AbstractContext;
 import org.apache.axis2.context.ConfigurationContext;
@@ -79,10 +78,7 @@
         if (!contexts.isEmpty()) {
             AbstractContext[] contextArray =
                     (AbstractContext[]) contexts.toArray(new AbstractContext[contexts.size()]);
-            String msgUUID = contextManager.updateContexts(contextArray);
-            if (getClusterManager(msgContext).synchronizeAllMembers()) {
-                waitForACKs(contextManager, msgUUID, msgContext.getRootContext());
-            }
+            contextManager.updateContexts(contextArray);
         }
     }
 
@@ -99,10 +95,7 @@
         log.debug("Going to replicate state in " + abstractContext + "...");
         ContextManager contextManager = getContextManager(abstractContext);
         if (!abstractContext.getPropertyDifferences().isEmpty()) {
-            String msgUUID = contextManager.updateContext(abstractContext);
-            if (getClusterManager(abstractContext).synchronizeAllMembers()) {
-                waitForACKs(contextManager, msgUUID, abstractContext.getRootContext());
-            }
+            contextManager.updateContext(abstractContext);
         }
     }
 
@@ -121,12 +114,7 @@
         }
         log.debug("Going to replicate selected properties in " + abstractContext + "...");
         ContextManager contextManager = getContextManager(abstractContext);
-        String msgUUID = contextManager.updateContext(abstractContext, propertyNames);
-        if (msgUUID != null) {
-            if (getClusterManager(abstractContext).synchronizeAllMembers()) {
-                waitForACKs(contextManager, msgUUID, abstractContext.getRootContext());
-            }
-        }
+        contextManager.updateContext(abstractContext, propertyNames);
     }
 
     private static ClusterManager getClusterManager(AbstractContext abstractContext) {
@@ -168,33 +156,5 @@
         ClusterManager clusterManager =
                 messageContext.getRootContext().getAxisConfiguration().getClusterManager();
         return clusterManager != null && clusterManager.getContextManager() != null;
-    }
-
-    private static void waitForACKs(ContextManager contextManager,
-                                    String msgUUID,
-                                    ConfigurationContext configCtx) throws ClusteringFault {
-
-        long start = System.currentTimeMillis();
-
-        // Wait till all members have ACKed receipt & successful processing of
-        // the message with UUID 'msgUUID'
-        do {
-
-            // Wait sometime before checking whether message is ACKed
-            try {
-                Long tts =
-                        (Long) configCtx.getPropertyNonReplicable(ClusteringConstants.TIME_TO_SEND);
-                if (tts == null) {
-                    Thread.sleep(5);
-                } else if (tts.longValue() >= 0) {
-                    Thread.sleep(tts.longValue() + 5); // Time to recv ACK + time in queue & processing replication request
-                }
-            } catch (InterruptedException ignored) {
-            }
-            if (System.currentTimeMillis() - start > 45000) {
-                throw new ClusteringFault("ACKs not received from all members within 45 sec. " +
-                                          "Aborting wait.");
-            }
-        } while (!contextManager.isMessageAcknowledged(msgUUID));
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org