You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2007/05/30 13:10:30 UTC

svn commit: r542797 - in /webservices/axis2/trunk/java/modules: clustering/src/org/apache/axis2/clustering/context/ clustering/src/org/apache/axis2/clustering/control/ clustering/src/org/apache/axis2/clustering/handlers/ clustering/src/org/apache/axis2...

Author: azeez
Date: Wed May 30 04:10:28 2007
New Revision: 542797

URL: http://svn.apache.org/viewvc?view=rev&rev=542797
Log:
Introducing a ACKing mechanism. The response is sent to the client only if the state change is successfully replicated across the cluster.
Each member has to send an ACK for a particular message received.


Added:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/AckCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java Wed May 30 04:10:28 2007
@@ -15,14 +15,16 @@
  */
 package org.apache.axis2.clustering.context;
 
+import org.apache.axiom.om.util.UUIDGenerator;
 import org.apache.axis2.clustering.context.commands.*;
+import org.apache.axis2.clustering.tribes.AckManager;
 import org.apache.axis2.context.*;
 import org.apache.axis2.deployment.DeploymentConstants;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.axiom.om.util.UUIDGenerator;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -34,6 +36,26 @@
 
     private static final Log log = LogFactory.getLog(ContextClusteringCommandFactory.class);
 
+    public static ContextClusteringCommandCollection
+            getCommandCollection(AbstractContext[] contexts,
+                                 Map excludedReplicationPatterns) {
+
+        ArrayList commands = new ArrayList(contexts.length);
+        ContextClusteringCommandCollection collection =
+                new ContextClusteringCommandCollection(commands);
+        for (int i = 0; i < contexts.length; i++) {
+            ContextClusteringCommand cmd = getUpdateCommand(contexts[i],
+                                                            excludedReplicationPatterns,
+                                                            false);
+            if (cmd != null) {
+                commands.add(cmd);
+            }
+        }
+        collection.setUniqueId(UUIDGenerator.getUUID());
+        AckManager.addInitialAcknowledgement(collection);
+        return collection;
+    }
+
     /**
      * @param context
      * @param excludedPropertyPatterns
@@ -82,6 +104,8 @@
         }
         if (cmd != null && ((UpdateContextCommand) cmd).isPropertiesEmpty()) {
             cmd = null;
+        } else {
+            AckManager.addInitialAcknowledgement(cmd);
         }
         context.clearPropertyDifferences(); // Once we send the diffs, we should clear the diffs
         return cmd;
@@ -175,7 +199,7 @@
         if (abstractContext instanceof ServiceGroupContext) {
             ServiceGroupContext sgCtx = (ServiceGroupContext) abstractContext;
             ServiceGroupContextCommand cmd = new CreateServiceGroupContextCommand();
-            //TODO impl
+            cmd.setUniqueId(UUIDGenerator.getUUID());
             cmd.setServiceGroupName(sgCtx.getDescription().getServiceGroupName());
             cmd.setServiceGroupContextId(sgCtx.getId());
             return cmd;
@@ -183,6 +207,7 @@
             ServiceContext serviceCtx = (ServiceContext) abstractContext;
             ServiceContextCommand cmd = new CreateServiceContextCommand();
             ServiceGroupContext sgCtx = (ServiceGroupContext) serviceCtx.getParent();
+            cmd.setUniqueId(UUIDGenerator.getUUID());
             cmd.setServiceGroupContextId(sgCtx.getId());
             cmd.setServiceGroupName(sgCtx.getDescription().getServiceGroupName());
             cmd.setServiceName(serviceCtx.getAxisService().getName());
@@ -196,6 +221,7 @@
             ServiceGroupContext sgCtx = (ServiceGroupContext) abstractContext;
             ServiceGroupContextCommand cmd = new DeleteServiceGroupContextCommand();
             // TODO: impl
+            cmd.setUniqueId(UUIDGenerator.getUUID());
             cmd.setServiceGroupName(sgCtx.getDescription().getServiceGroupName());
             cmd.setServiceGroupContextId(sgCtx.getId());
             return cmd;
@@ -203,6 +229,7 @@
             ServiceContext serviceCtx = (ServiceContext) abstractContext;
             ServiceContextCommand cmd = new DeleteServiceContextCommand();
             // TODO: impl
+            cmd.setUniqueId(UUIDGenerator.getUUID());
             cmd.setServiceGroupName(serviceCtx.getGroupName());
             cmd.setServiceName(serviceCtx.getAxisService().getName());
             return cmd;

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java Wed May 30 04:10:28 2007
@@ -20,6 +20,8 @@
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.clustering.MessageSender;
+import org.apache.axis2.clustering.tribes.AckManager;
+import org.apache.axis2.clustering.tribes.ChannelSender;
 import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection;
 import org.apache.axis2.context.AbstractContext;
 import org.apache.axis2.context.ConfigurationContext;
@@ -35,56 +37,59 @@
 
     private Map parameters = new HashMap();
 
-    private MessageSender sender;
+    private ChannelSender sender;
     private ContextReplicationProcessor processor = new ContextReplicationProcessor();
 
     private Map excludedReplicationPatterns = new HashMap();
 
-    public void setSender(MessageSender sender) {
+    //TODO: Try how to use an interface
+    public void setSender(ChannelSender sender) {
         this.sender = sender;
     }
 
     public DefaultContextManager() {
     }
 
-    public void addContext(final AbstractContext context) throws ClusteringFault {
-        processor.process(ContextClusteringCommandFactory.getCreateCommand(context));
+    public String addContext(final AbstractContext context) throws ClusteringFault {
+        ContextClusteringCommand cmd = ContextClusteringCommandFactory.getCreateCommand(context);
+        processor.process(cmd);
+        return cmd.getUniqueId();
     }
 
-    public void removeContext(AbstractContext context) throws ClusteringFault {
-        processor.process(ContextClusteringCommandFactory.getRemoveCommand(context));
+    public String removeContext(AbstractContext context) throws ClusteringFault {
+        ContextClusteringCommand cmd = ContextClusteringCommandFactory.getRemoveCommand(context);
+        processor.process(cmd);
+        return cmd.getUniqueId();
     }
 
-    public void updateContext(AbstractContext context) throws ClusteringFault {
-        ContextClusteringCommand message =
+    public String updateContext(AbstractContext context) throws ClusteringFault {
+        ContextClusteringCommand cmd =
                 ContextClusteringCommandFactory.getUpdateCommand(context,
                                                                  excludedReplicationPatterns,
                                                                  false);
-        if (message != null) {
-            processor.process(message);
+        if (cmd != null) {
+            processor.process(cmd);
+            return cmd.getUniqueId();
         }
+        return null;
     }
 
-    public void updateContexts(AbstractContext[] contexts) throws ClusteringFault {
-        ArrayList commands = new ArrayList(contexts.length);
-        ContextClusteringCommandCollection collection =
-                new ContextClusteringCommandCollection(commands);
-        for (int i = 0; i < contexts.length; i++) {
-            ContextClusteringCommand cmd =
-                    ContextClusteringCommandFactory.getUpdateCommand(contexts[i],
-                                                                     excludedReplicationPatterns,
-                                                                     false);
-            if (cmd != null) {
-                commands.add(cmd);
-            }
-        }
-        processor.process(collection);
+    public String updateContexts(AbstractContext[] contexts) throws ClusteringFault {
+        ContextClusteringCommandCollection cmd =
+                ContextClusteringCommandFactory.getCommandCollection(contexts,
+                                                                     excludedReplicationPatterns);
+        processor.process(cmd);
+        return cmd.getUniqueId();
     }
 
     public boolean isContextClusterable(AbstractContext context) {
         return (context instanceof ConfigurationContext) ||
                (context instanceof ServiceContext) ||
                (context instanceof ServiceGroupContext);
+    }
+
+    public boolean isMessageAcknowledged(String messageUniqueId) throws ClusteringFault {
+        return AckManager.isMessageAcknowledged(messageUniqueId, sender);
     }
 
     public void process(ContextClusteringCommand command) throws ClusteringFault {

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/AckCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/AckCommand.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/AckCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/AckCommand.java Wed May 30 04:10:28 2007
@@ -15,13 +15,16 @@
  */
 package org.apache.axis2.clustering.control;
 
-import org.apache.axis2.clustering.ClusteringCommand;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.tribes.AckManager;
+import org.apache.axis2.context.ConfigurationContext;
 
 /**
- *  ACK for the message with id <code>uniqueId</code>
+ * ACK for the message with id <code>uniqueId</code>
  */
-public class AckCommand extends ClusteringCommand {
+public class AckCommand extends ControlCommand {
     private String uniqueId;
+    private String memberId;
 
     public AckCommand(String messageUniqueId) {
         this.uniqueId = messageUniqueId;
@@ -31,7 +34,19 @@
         return uniqueId;
     }
 
+    public void setMemberId(String memberId) {
+        this.memberId = memberId;
+    }
+
     public int getCommandType() {
         return Integer.MAX_VALUE;
+    }
+
+    public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+        AckManager.addAcknowledgement(uniqueId, memberId);
+    }
+
+    public String toString() {
+        return "ACK for message with UUID " + uniqueId;
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java Wed May 30 04:10:28 2007
@@ -59,15 +59,17 @@
                 if (updateCmd != null) {
                     cmdList.add(updateCmd);
                 }
-                for (Iterator iter2 = sgCtx.getServiceContexts(); iter2.hasNext();) {
-                    ServiceContext serviceCtx = (ServiceContext) iter2.next();
-                    cmdList.add(ContextClusteringCommandFactory.getCreateCommand(serviceCtx));
-                    ContextClusteringCommand updateServiceCtxCmd =
-                            ContextClusteringCommandFactory.getUpdateCommand(serviceCtx,
-                                                                             excludedPropPatterns,
-                                                                             true);
-                    if (updateServiceCtxCmd != null) {
-                        cmdList.add(updateServiceCtxCmd);
+                if (sgCtx.getServiceContexts() != null) {
+                    for (Iterator iter2 = sgCtx.getServiceContexts(); iter2.hasNext();) {
+                        ServiceContext serviceCtx = (ServiceContext) iter2.next();
+                        cmdList.add(ContextClusteringCommandFactory.getCreateCommand(serviceCtx));
+                        ContextClusteringCommand updateServiceCtxCmd =
+                                ContextClusteringCommandFactory.getUpdateCommand(serviceCtx,
+                                                                                 excludedPropPatterns,
+                                                                                 true);
+                        if (updateServiceCtxCmd != null) {
+                            cmdList.add(updateServiceCtxCmd);
+                        }
                     }
                 }
             }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java Wed May 30 04:10:28 2007
@@ -35,7 +35,13 @@
 
     public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
         log.debug("Going to replicate state on invoke");
-        replicateState(msgContext);
+        try {
+            replicateState(msgContext);
+        } catch (Exception e) {
+            System.err.println("###########################");
+            e.printStackTrace();
+            System.err.println("###########################");
+        }
         return InvocationResponse.CONTINUE;
     }
 
@@ -85,10 +91,28 @@
 
             // Do the actual replication here
             if (!contexts.isEmpty()) {
-                contextManager.
+                String msgUUID = contextManager.
                         updateContexts((AbstractContext[]) contexts.
                                 toArray(new AbstractContext[contexts.size()]));
+
+                long start = System.currentTimeMillis();
+
+                // Wait till all members have ACKed receipt & successful processing of
+                // the message with UUID 'msgUUID'
+                while (!contextManager.isMessageAcknowledged(msgUUID)) {
+                    if(System.currentTimeMillis() - start > 20000){
+                        throw new ClusteringFault("ACKs not received from all members within 1 min. " +
+                                                  "Aborting wait.");
+                    }
+                    try {
+                        Thread.sleep(2);
+                    } catch (InterruptedException e) {
+                        log.error(e);
+                        break;
+                    }
+                }
             }
+
         } else {
             String msg = "Cannot replicate contexts since " +
                          "ClusterManager is not specified in the axis2.xml file.";

Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java?view=auto&rev=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java Wed May 30 04:10:28 2007
@@ -0,0 +1,93 @@
+/*                                                                             
+ * Copyright 2004,2005 The Apache Software Foundation.                         
+ *                                                                             
+ * Licensed under the Apache License, Version 2.0 (the "License");             
+ * you may not use this file except in compliance with the License.            
+ * You may obtain a copy of the License at                                     
+ *                                                                             
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
+ *                                                                             
+ * Unless required by applicable law or agreed to in writing, software         
+ * distributed under the License is distributed on an "AS IS" BASIS,           
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
+ * See the License for the specific language governing permissions and         
+ * limitations under the License.                                              
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.context.ContextClusteringCommand;
+import org.apache.catalina.tribes.Member;
+
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+/**
+ * 
+ */
+public final class AckManager {
+
+    private static Map messageAckTable = new Hashtable();
+
+    public static void addInitialAcknowledgement(ContextClusteringCommand command) {
+        messageAckTable.put(command.getUniqueId(), new MessageACK(command));
+    }
+
+    public static void addAcknowledgement(String messageUniqueId,
+                                          String memberId) {
+        MessageACK ack = (MessageACK) messageAckTable.get(messageUniqueId);
+        if (ack != null) {
+            List memberList = ack.getMemberList();
+            memberList.add(memberId);
+        }
+    }
+
+    public static boolean isMessageAcknowledged(String messageUniqueId,
+                                                ChannelSender sender) throws ClusteringFault {
+        boolean isAcknowledged = false;
+        MessageACK ack = (MessageACK) messageAckTable.get(messageUniqueId);
+        List memberList = ack.getMemberList();
+
+        // Check that all members in the memberList are same as the total member list,
+        // which will indicate that all members have ACKed the message
+        Member[] members = sender.getChannel().getMembers();
+        for (int i = 0; i < members.length; i++) {
+            Member member = members[i];
+            if (!memberList.contains(member.getName())) {
+
+                // At this point, resend the original message back to the node which has not
+                // sent an ACK
+                sender.sendToMember(ack.getCommand(), member);
+                isAcknowledged = false;
+                break;
+            } else {
+                isAcknowledged = true;
+            }
+        }
+
+        // If a message is ACKed, we don't have to keep track of it in our ackTbl anymore
+        if (isAcknowledged) {
+            messageAckTable.remove(messageUniqueId);
+        }
+        return isAcknowledged;
+    }
+
+    private static class MessageACK {
+        private ContextClusteringCommand command;
+        private List memberList = new Vector();
+
+        public MessageACK(ContextClusteringCommand command) {
+            this.command = command;
+        }
+
+        public ContextClusteringCommand getCommand() {
+            return command;
+        }
+
+        public List getMemberList() {
+            return memberList;
+        }
+    }
+}

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Wed May 30 04:10:28 2007
@@ -16,12 +16,17 @@
 
 package org.apache.axis2.clustering.tribes;
 
+import org.apache.axis2.clustering.ClusteringConstants;
 import org.apache.axis2.clustering.configuration.ConfigurationClusteringCommand;
 import org.apache.axis2.clustering.configuration.DefaultConfigurationManager;
 import org.apache.axis2.clustering.context.ContextClusteringCommand;
 import org.apache.axis2.clustering.context.DefaultContextManager;
+import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection;
+import org.apache.axis2.clustering.context.commands.UpdateContextCommand;
 import org.apache.axis2.clustering.control.AckCommand;
 import org.apache.axis2.clustering.control.ControlCommand;
+import org.apache.axis2.clustering.control.GetStateResponseCommand;
+import org.apache.axis2.context.ConfigurationContext;
 import org.apache.catalina.tribes.Member;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,7 +54,10 @@
      */
     private Thread messageProcessor;
 
-    public ChannelListener(DefaultConfigurationManager configurationManager,
+    private ConfigurationContext configurationContext;
+
+    public ChannelListener(ConfigurationContext configurationContext,
+                           DefaultConfigurationManager configurationManager,
                            DefaultContextManager contextManager,
                            TribesControlCommandProcessor controlCommandProcessor,
                            ChannelSender sender) {
@@ -57,6 +65,7 @@
         this.contextManager = contextManager;
         this.controlCommandProcessor = controlCommandProcessor;
         this.sender = sender;
+        this.configurationContext = configurationContext;
         startMessageProcessor();
     }
 
@@ -68,12 +77,24 @@
         this.configurationManager = configurationManager;
     }
 
+    public void setConfigurationContext(ConfigurationContext configurationContext) {
+        this.configurationContext = configurationContext;
+    }
+
     public boolean accept(Serializable msg, Member sender) {
         return true;
     }
 
     public void messageReceived(Serializable msg, Member sender) {
-        log.debug("Message received : " + msg);
+
+        // If the system has not still been intialized, reject all incoming messages, except the
+        // GetStateResponseCommand message
+        if (configurationContext.
+                getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null
+            && !(msg instanceof GetStateResponseCommand)) {
+            return;
+        }
+        log.debug("RECEIVED MESSAGE " + msg);
         synchronized (cmdQueue) {
             cmdQueue.enqueue(new MemberMessage(msg, sender));
         }
@@ -129,10 +150,16 @@
                     if (msg instanceof ContextClusteringCommand && contextManager != null) {
                         ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg;
                         contextManager.process(ctxCmd);
-                        AckCommand ackCmd = new AckCommand(ctxCmd.getUniqueId());
 
-                        // Send the ACK
-                        sender.sendToMember(ackCmd, memberMessage.getSender());
+                        // Sending ACKs for ContextClusteringCommandCollection or
+                        // UpdateContextCommand is sufficient
+                        if (msg instanceof ContextClusteringCommandCollection ||
+                            msg instanceof UpdateContextCommand) {
+                            AckCommand ackCmd = new AckCommand(ctxCmd.getUniqueId());
+
+                            // Send the ACK
+                            sender.sendToMember(ackCmd, memberMessage.getSender());
+                        }
                     } else if (msg instanceof ConfigurationClusteringCommand &&
                                configurationManager != null) {
                         configurationManager.process((ConfigurationClusteringCommand) msg);

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Wed May 30 04:10:28 2007
@@ -22,15 +22,11 @@
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.Member;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 public class ChannelSender implements MessageSender {
 
     private Channel channel;
 
-    private static final Log log = LogFactory.getLog(ChannelSender.class);
-
     public void sendToGroup(ClusteringCommand msg) throws ClusteringFault {
         if(channel == null) return;
         Member[] members = channel.getMembers();
@@ -82,14 +78,5 @@
 
     public void setChannel(Channel channel) {
         this.channel = channel;
-    }
-
-    private void printMember(Member member) {
-        member.getUniqueId();
-        log.debug("\n===============================");
-        log.debug("Member Name " + member.getName());
-        log.debug("Member Host" + member.getHost());
-        log.debug("Member Payload" + member.getPayload());
-        log.debug("===============================\n");
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Wed May 30 04:10:28 2007
@@ -52,6 +52,7 @@
     private ManagedChannel channel;
     private ConfigurationContext configurationContext;
     private TribesControlCommandProcessor controlCmdProcessor;
+    private ChannelListener channelListener;
 
     public TribesClusterManager() {
         parameters = new HashMap();
@@ -69,10 +70,11 @@
     public void init() throws ClusteringFault {
         ChannelSender sender = new ChannelSender();
 
-        ChannelListener listener = new ChannelListener(configurationManager,
-                                                       contextManager,
-                                                       controlCmdProcessor,
-                                                       sender);
+        channelListener = new ChannelListener(configurationContext,
+                                              configurationManager,
+                                              contextManager,
+                                              controlCmdProcessor,
+                                              sender);
 
         if (configurationManager != null) {
             configurationManager.setSender(sender);
@@ -114,7 +116,7 @@
 //            tcpFailureDetector.setPrevious(nbc);
 //            channel.addInterceptor(tcpFailureDetector);
 
-            channel.addChannelListener(listener);
+            channel.addChannelListener(channelListener);
             TribesMembershipListener membershipListener = new TribesMembershipListener();
             channel.addMembershipListener(membershipListener);
             channel.start(Channel.DEFAULT);
@@ -122,7 +124,7 @@
 
             if (contextManager != null) {
                 contextManager.setSender(sender);
-                listener.setContextManager(contextManager);
+                channelListener.setContextManager(contextManager);
 
                 Member[] members = channel.getMembers();
                 TribesUtil.printMembers(members);
@@ -220,5 +222,8 @@
     public void setConfigurationContext(ConfigurationContext configurationContext) {
         this.configurationContext = configurationContext;
         controlCmdProcessor.setConfigurationContext(configurationContext);
+        if (channelListener != null) {
+            channelListener.setConfigurationContext(configurationContext);
+        }
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java Wed May 30 04:10:28 2007
@@ -17,6 +17,7 @@
 
 import org.apache.axis2.clustering.ClusteringConstants;
 import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.control.AckCommand;
 import org.apache.axis2.clustering.control.ControlCommand;
 import org.apache.axis2.clustering.control.GetStateCommand;
 import org.apache.axis2.clustering.control.GetStateResponseCommand;
@@ -58,6 +59,10 @@
             GetStateResponseCommand getStateRespCmd = new GetStateResponseCommand();
             getStateRespCmd.setCommands(((GetStateCommand) command).getCommands());
             channelSender.sendToMember(getStateRespCmd, sender);
+        } else if (command instanceof AckCommand) {
+            AckCommand cmd = (AckCommand) command;
+            cmd.setMemberId(sender.getName());
+            cmd.execute(configurationContext);
         } else {
             command.execute(configurationContext);
         }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java Wed May 30 04:10:28 2007
@@ -17,10 +17,8 @@
 
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.axis2.context.ConfigurationContext;
 
 /**
  * 
@@ -28,32 +26,14 @@
 public class TribesMembershipListener implements MembershipListener {
 
     private static Log log = LogFactory.getLog(TribesMembershipListener.class);
-//    private ConfigurationContext configContext;
 
     public void memberAdded(Member member) {
-        log.info("New member " + getHostSocket(member) + " added to Tribes group.");
-        /* TODO: Send state information to this member.
-        But if all of the members start sending these messages, there is
-        it is going to be messy. Need to ensure that only one node send this message*/
-
-//        System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator());
+        log.info("New member " + member.getName() + " joined cluster.");
+       //        System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator());
     }
 
     public void memberDisappeared(Member member) {
-        log.info("Member " + getHostSocket(member) + " left Tribes group");
+        log.info("Member " + member.getName() + " left cluster");
 //        System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator());
     }
-
-    private String getHostSocket(Member member) {
-        String host = null;
-        byte[] hostBytes = member.getHost();
-        for (int i = 0; i < hostBytes.length; i++) {
-            host = (host == null) ? ("" + hostBytes[i]) : (host + "." + hostBytes[i]);
-        }
-        return host + ":" + member.getPort();
-    }/*
-
-    public void setConfigContext(ConfigurationContext configContext) {
-        this.configContext = configContext;
-    }*/
 }

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java Wed May 30 04:10:28 2007
@@ -18,6 +18,7 @@
 package org.apache.axis2.clustering;
 
 public final class ClusteringConstants {
+
     private ClusteringConstants() {
     }
 

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java Wed May 30 04:10:28 2007
@@ -30,17 +30,19 @@
      * This method is called when a new {@link AbstractContext} is added to the system
      *
      * @param context
+     * @return The UUID of the message that was sent to the group communications framework
      * @throws ClusteringFault
      */
-    void addContext(AbstractContext context) throws ClusteringFault;
+    String addContext(AbstractContext context) throws ClusteringFault;
 
     /**
      * This method is called when a new {@link AbstractContext} is removed from the system
      *
      * @param context
+     * @return The UUID of the message that was sent to the group communications framework
      * @throws ClusteringFault
      */
-    void removeContext(AbstractContext context) throws ClusteringFault;
+    String removeContext(AbstractContext context) throws ClusteringFault;
 
     /**
      * This method is called when properties in an {@link AbstractContext} are updated.
@@ -48,9 +50,10 @@
      * removal of properties.
      *
      * @param context
+     * @return The UUID of the message that was sent to the group communications framework
      * @throws ClusteringFault
      */
-    void updateContext(AbstractContext context) throws ClusteringFault;
+    String updateContext(AbstractContext context) throws ClusteringFault;
 
     /**
      * This method is called when properties in a collection of {@link AbstractContext}s are updated.
@@ -58,17 +61,27 @@
      * removal of properties.
      *
      * @param contexts
+     * @return The UUID of the message that was sent to the group communications framework
      * @throws ClusteringFault
      */
-    void updateContexts(AbstractContext[] contexts) throws ClusteringFault;
+    String updateContexts(AbstractContext[] contexts) throws ClusteringFault;
 
     /**
-     *
      * @param context
      * @return True - if the provided {@link AbstractContext}  is clusterable
      * @throws ClusteringFault
      */
     boolean isContextClusterable(AbstractContext context) throws ClusteringFault;
+
+    /**
+     * Indicates whether a particular message has been ACKed by all members of a cluster
+     *
+     * @param messageUniqueId
+     * @return true - if all memebers have ACKed the message with ID <code>messageUniqueId</code>
+     *         false - otherwise
+     * @throws ClusteringFault
+     */
+    boolean isMessageAcknowledged(String messageUniqueId) throws ClusteringFault;
 
     /**
      * @param listener



---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org