You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2007/06/06 16:09:42 UTC

svn commit: r544845 - in /webservices/axis2/trunk/java/modules: clustering/src/org/apache/axis2/clustering/configuration/commands/ clustering/src/org/apache/axis2/clustering/context/commands/ clustering/src/org/apache/axis2/clustering/control/ clusteri...

Author: azeez
Date: Wed Jun  6 07:09:41 2007
New Revision: 544845

URL: http://svn.apache.org/viewvc?view=rev&rev=544845
Log:
Few more improvements to the clustering implementation.



Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/CommitCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/ExceptionCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/PrepareCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/RollbackCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/description/AxisOperation.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/CommitCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/CommitCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/CommitCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/CommitCommand.java Wed Jun  6 07:09:41 2007
@@ -41,4 +41,8 @@
     public void rollback(ConfigurationContext configContext) throws Exception {
         // Nothing to implement
     }
+
+    public String toString() {
+        return "CommitCommand";
+    }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/ExceptionCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/ExceptionCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/ExceptionCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/ExceptionCommand.java Wed Jun  6 07:09:41 2007
@@ -51,4 +51,8 @@
     public void rollback(ConfigurationContext configContext) throws Exception {
         // Nothing to implement
     }
+
+    public String toString() {
+        return "ExceptionCommand";
+    }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/PrepareCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/PrepareCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/PrepareCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/PrepareCommand.java Wed Jun  6 07:09:41 2007
@@ -41,4 +41,8 @@
     public void rollback(ConfigurationContext configContext) throws Exception {
         // Nothing to implement
     }
+
+    public String toString() {
+        return "PrepareCommand";
+    }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/RollbackCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/RollbackCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/RollbackCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/RollbackCommand.java Wed Jun  6 07:09:41 2007
@@ -41,4 +41,8 @@
     public void rollback(ConfigurationContext configContext) throws Exception {
         // Nothing to implement
     }
+
+    public String toString() {
+        return "RollbackCommand";
+    }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java Wed Jun  6 07:09:41 2007
@@ -40,4 +40,8 @@
             }
         }
     }
+
+    public String toString() {
+        return "ContextClusteringCommandCollection(" + uniqueId + ")";
+    }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java Wed Jun  6 07:09:41 2007
@@ -16,36 +16,18 @@
 package org.apache.axis2.clustering.context.commands;
 
 import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.context.ContextClusteringCommand;
-import org.apache.axis2.clustering.context.PropertyUpdater;
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.PropertyDifference;
-
-import java.util.HashMap;
 
 /**
  * 
  */
 public class UpdateConfigurationContextCommand extends UpdateContextCommand {
 
-    private PropertyUpdater propertyUpdater = new PropertyUpdater();
-
     public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
         propertyUpdater.updateProperties(configurationContext);
     }
 
-    public boolean isPropertiesEmpty(){
-        if (propertyUpdater.getProperties() == null) {
-            propertyUpdater.setProperties(new HashMap());
-            return true;
-        }
-        return propertyUpdater.getProperties().isEmpty();
-    }
-
-    public void addProperty(PropertyDifference diff) {
-        if (propertyUpdater.getProperties() == null) {
-            propertyUpdater.setProperties(new HashMap());
-        }
-        propertyUpdater.addContextProperty(diff);
+    public String toString() {
+        return "UpdateConfigurationContextCommand(" + uniqueId + ")";
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java Wed Jun  6 07:09:41 2007
@@ -16,14 +16,30 @@
 package org.apache.axis2.clustering.context.commands;
 
 import org.apache.axis2.clustering.context.ContextClusteringCommand;
+import org.apache.axis2.clustering.context.PropertyUpdater;
 import org.apache.axis2.context.PropertyDifference;
 
+import java.util.HashMap;
+
 /**
  * 
  */
 public abstract class UpdateContextCommand extends ContextClusteringCommand {
-    public abstract void addProperty(PropertyDifference diff);
-    
 
-    public abstract boolean isPropertiesEmpty();
+    protected PropertyUpdater propertyUpdater = new PropertyUpdater();
+
+    public boolean isPropertiesEmpty() {
+        if (propertyUpdater.getProperties() == null) {
+            propertyUpdater.setProperties(new HashMap());
+            return true;
+        }
+        return propertyUpdater.getProperties().isEmpty();
+    }
+
+    public void addProperty(PropertyDifference diff) {
+        if (propertyUpdater.getProperties() == null) {
+            propertyUpdater.setProperties(new HashMap());
+        }
+        propertyUpdater.addContextProperty(diff);
+    }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java Wed Jun  6 07:09:41 2007
@@ -18,17 +18,13 @@
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
 import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.context.PropertyUpdater;
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.PropertyDifference;
 import org.apache.axis2.context.ServiceContext;
 import org.apache.axis2.context.ServiceGroupContext;
 import org.apache.axis2.description.AxisService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.HashMap;
-
 /**
  * 
  */
@@ -36,7 +32,6 @@
 
     private static final Log log = LogFactory.getLog(UpdateServiceContextCommand.class);
 
-    private PropertyUpdater propertyUpdater = new PropertyUpdater();
     protected String serviceGroupName;
     protected String serviceGroupContextId;
     protected String serviceName;
@@ -97,18 +92,7 @@
         }
     }
 
-    public boolean isPropertiesEmpty() {
-        if (propertyUpdater.getProperties() == null) {
-            propertyUpdater.setProperties(new HashMap());
-            return true;
-        }
-        return propertyUpdater.getProperties().isEmpty();
-    }
-
-    public void addProperty(PropertyDifference diff) {
-        if (propertyUpdater.getProperties() == null) {
-            propertyUpdater.setProperties(new HashMap());
-        }
-        propertyUpdater.addContextProperty(diff);
+    public String toString() {
+        return "UpdateServiceContextCommand(" + uniqueId + ")";
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java Wed Jun  6 07:09:41 2007
@@ -16,23 +16,18 @@
 package org.apache.axis2.clustering.context.commands;
 
 import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.context.PropertyUpdater;
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.PropertyDifference;
 import org.apache.axis2.context.ServiceGroupContext;
 import org.apache.axis2.description.AxisServiceGroup;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.HashMap;
-
 /**
  * 
  */
 public class UpdateServiceGroupContextCommand extends UpdateContextCommand {
 
     private static Log log = LogFactory.getLog(UpdateServiceGroupContextCommand.class);
-    private PropertyUpdater propertyUpdater = new PropertyUpdater();
 
     protected String serviceGroupName;
     protected String serviceGroupContextId;
@@ -70,18 +65,7 @@
         propertyUpdater.updateProperties(sgCtx);
     }
 
-    public boolean isPropertiesEmpty() {
-        if (propertyUpdater.getProperties() == null) {
-            propertyUpdater.setProperties(new HashMap());
-            return true;
-        }
-        return propertyUpdater.getProperties().isEmpty();
-    }
-
-    public void addProperty(PropertyDifference diff) {
-        if (propertyUpdater.getProperties() == null) {
-            propertyUpdater.setProperties(new HashMap());
-        }
-        propertyUpdater.addContextProperty(diff);
+    public String toString() {
+        return "UpdateServiceGroupContextCommand(" + uniqueId + ")";
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java Wed Jun  6 07:09:41 2007
@@ -88,4 +88,8 @@
     public ContextClusteringCommand[] getCommands() {
         return commands;
     }
+
+    public String toString() {
+        return "GetStateCommand";
+    }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java Wed Jun  6 07:09:41 2007
@@ -45,4 +45,8 @@
     public void setCommands(ContextClusteringCommand[] commands) {
         this.commands = commands;
     }
+
+    public String toString() {
+        return "GetStateResponseCommand";
+    }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java Wed Jun  6 07:09:41 2007
@@ -21,6 +21,7 @@
 import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.clustering.context.ContextManager;
 import org.apache.axis2.context.*;
+import org.apache.axis2.description.WSDL2Constants;
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.axis2.handlers.AbstractHandler;
 import org.apache.commons.logging.Log;
@@ -34,29 +35,51 @@
     private static final Log log = LogFactory.getLog(ReplicationHandler.class);
 
     public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
-        log.debug("Going to replicate state on invoke");
+//        System.err.println("### [INVOKE] Going to replicate state. Flow:" + msgContext.getFLOW());
+        /* log.debug("Going to replicate state on invoke");
         try {
             replicateState(msgContext);
         } catch (Exception e) {
             System.err.println("###########################");
-            e.printStackTrace();
+            String message = "Could not replicate the state";
+            log.error(message, e);
             System.err.println("###########################");
-        }
+        }*/
         return InvocationResponse.CONTINUE;
     }
 
     public void flowComplete(MessageContext msgContext) {
-        log.debug("Going to replicate state on flowComplete");
-        try {
-            replicateState(msgContext);
-        } catch (Exception e) {
-            String message = "Could not replicate the state";
-            log.error(message, e);
+        int flow = msgContext.getFLOW();
+        String mep = msgContext.getAxisOperation().getMessageExchangePattern();
+
+        // The ReplicationHandler should be added to all 4 flows. We will replicate on flowComplete
+        // only during one of the flows
+        boolean replicateOnInFLow =
+                ((mep.equals(WSDL2Constants.MEP_URI_IN_ONLY) ||
+                  mep.equals(WSDL2Constants.MEP_URI_IN_OPTIONAL_OUT) ||
+                  mep.equals(WSDL2Constants.MEP_URI_ROBUST_IN_ONLY))
+                 && (flow == MessageContext.IN_FLOW || flow == MessageContext.IN_FAULT_FLOW));
+        
+        boolean replicateOnOutFlow =
+                (mep.equals(WSDL2Constants.MEP_URI_IN_OUT) ||
+                 mep.equals(WSDL2Constants.MEP_URI_OUT_ONLY) ||
+                 mep.equals(WSDL2Constants.MEP_URI_OUT_OPTIONAL_IN) ||
+                 mep.equals(WSDL2Constants.MEP_URI_OUT_IN) ||
+                 mep.equals(WSDL2Constants.MEP_URI_ROBUST_OUT_ONLY))
+                && (flow == MessageContext.OUT_FLOW || flow == MessageContext.OUT_FAULT_FLOW);
+
+        if (replicateOnInFLow || replicateOnOutFlow) {
+            System.err.println("### [FLOW COMPLETE] Going to replicate state. Flow:" + flow);
+            try {
+                replicateState(msgContext);
+            } catch (Exception e) {
+                String message = "Could not replicate the state";
+                log.error(message, e);
+            }
         }
     }
 
     private void replicateState(MessageContext message) throws ClusteringFault {
-
         ConfigurationContext configurationContext = message.getConfigurationContext();
         AxisConfiguration axisConfiguration = configurationContext.getAxisConfiguration();
         ClusterManager clusterManager = axisConfiguration.getClusterManager();
@@ -91,26 +114,26 @@
 
             // Do the actual replication here
             if (!contexts.isEmpty()) {
-                String msgUUID = contextManager.
-                        updateContexts((AbstractContext[]) contexts.
+                String msgUUID =
+                        contextManager.updateContexts((AbstractContext[]) contexts.
                                 toArray(new AbstractContext[contexts.size()]));
 
                 long start = System.currentTimeMillis();
 
                 // Wait till all members have ACKed receipt & successful processing of
                 // the message with UUID 'msgUUID'
-                while (!contextManager.isMessageAcknowledged(msgUUID)) {
-                    if(System.currentTimeMillis() - start > 20000){
-                        throw new ClusteringFault("ACKs not received from all members within 20 sec. " +
-                                                  "Aborting wait.");
-                    }
+                /*do {
                     try {
-                        Thread.sleep(20);
+                        Thread.sleep(50);
                     } catch (InterruptedException e) {
                         log.error(e);
                         break;
                     }
-                }
+                    if (System.currentTimeMillis() - start > 20000) {
+                        throw new ClusteringFault("ACKs not received from all members within 20 sec. " +
+                                                  "Aborting wait.");
+                    }
+                } while (!contextManager.isMessageAcknowledged(msgUUID));*/
             }
 
         } else {

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java Wed Jun  6 07:09:41 2007
@@ -59,10 +59,15 @@
             for (int i = 0; i < members.length; i++) {
                 Member member = members[i];
                 if (!memberList.contains(member.getName())) {
-
+                    System.err.println("\n\n");
+                    System.err.println("##### NO ACK from member " + member.getName());
+                    System.err.println("#### ACKed member list=" + memberList);
+                    System.err.println("\n\n");
                     // At this point, resend the original message back to the node which has not
                     // sent an ACK
                     sender.sendToMember(ack.getCommand(), member);
+
+                    //TODO: Check whether this is a new member. If then send the msg
                     isAcknowledged = false;
                     break;
                 } else {

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Wed Jun  6 07:09:41 2007
@@ -17,6 +17,7 @@
 package org.apache.axis2.clustering.tribes;
 
 import org.apache.axis2.clustering.ClusteringConstants;
+import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.clustering.configuration.ConfigurationClusteringCommand;
 import org.apache.axis2.clustering.configuration.DefaultConfigurationManager;
 import org.apache.axis2.clustering.context.ContextClusteringCommand;
@@ -94,7 +95,20 @@
             && !(msg instanceof GetStateResponseCommand)) {
             return;
         }
-        log.debug("RECEIVED MESSAGE " + msg);
+        log.debug("RECEIVED MESSAGE " + msg + " from " + sender.getName());
+
+        // Need to process ACKs as soon as they are received since otherwise,
+        // unnecessary retransmissions will take place
+        if(msg instanceof AckCommand){
+            try {
+                controlCommandProcessor.process((AckCommand) msg, sender);
+            } catch (Exception e) {
+                log.error(e);
+            }
+            return;
+        }
+
+        // Add the commands to be precessed to the cmdQueue
         synchronized (cmdQueue) {
             cmdQueue.enqueue(new MemberMessage(msg, sender));
         }
@@ -106,8 +120,8 @@
     private void startMessageProcessor() {
         messageProcessor = new Thread(new MessageProcessor(), "ClusteringInComingMessageProcessor");
         messageProcessor.setDaemon(true);
-        messageProcessor.start();
         messageProcessor.setPriority(Thread.MAX_PRIORITY);
+        messageProcessor.start();
     }
 
     /**
@@ -132,7 +146,7 @@
     }
 
     /**
-     * A processor which continously polls for messages in the cmdQueue and processes them
+     * A processor which continuously polls for messages in the cmdQueue and processes them
      */
     private class MessageProcessor implements Runnable {
         public void run() {

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Wed Jun  6 07:09:41 2007
@@ -22,9 +22,12 @@
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.Member;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public class ChannelSender implements MessageSender {
 
+    private Log log = LogFactory.getLog(ChannelSender.class);
     private Channel channel;
 
     public void sendToGroup(ClusteringCommand msg) throws ClusteringFault {
@@ -32,7 +35,8 @@
         Member[] members = channel.getMembers();
         if (members.length > 0) {
             try {
-                channel.send(members, msg, 0);
+                channel.send(members, msg, Channel.DEFAULT);
+                log.debug("Sent " + msg + " to group");
             } catch (ChannelException e) {
                 String message = "Error sending command message : " + msg;
                 throw new ClusteringFault(message, e);
@@ -45,7 +49,8 @@
         try {
             channel.send(new Member[]{channel.getLocalMember(true)},
                          msg,
-                         Channel.SEND_OPTIONS_USE_ACK);
+                         Channel.DEFAULT);
+            log.debug("Sent " + msg + " to self");
         } catch (ChannelException e) {
             throw new ClusteringFault(e);
         }
@@ -57,6 +62,7 @@
         if (group.length > 0) {
             try {
                 channel.send(group, throwable, 0);
+                log.debug("Sent " + throwable + " to group");
             } catch (ChannelException e) {
                 String message = "Error sending exception message : " + throwable;
                 throw new ClusteringFault(message, e);
@@ -66,7 +72,8 @@
 
     public void sendToMember(ClusteringCommand cmd, Member member) throws ClusteringFault {
         try {
-            channel.send(new Member[]{member}, cmd, Channel.SEND_OPTIONS_USE_ACK);
+            channel.send(new Member[]{member}, cmd, Channel.DEFAULT);
+            log.debug("Sent " + cmd + " to " + member.getName());
         } catch (ChannelException e) {
             throw new ClusteringFault(e);
         }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Wed Jun  6 07:09:41 2007
@@ -37,10 +37,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Random;
+import java.util.*;
 
 public class TribesClusterManager implements ClusterManager {
     private static final Log log = LogFactory.getLog(TribesClusterManager.class);
@@ -94,9 +91,6 @@
                 domain = "apache.axis2.domain".getBytes();
             }
             channel.getMembershipService().setDomain(domain);
-
-//            ((McastService)channel.getMembershipService()).setPort(5000);
-
             DomainFilterInterceptor dfi = new DomainFilterInterceptor();
             dfi.setDomain(domain);
             channel.addInterceptor(dfi);
@@ -112,6 +106,16 @@
             nbc.setPrevious(dfi);
             channel.addInterceptor(nbc);*/
 
+            /*Properties mcastProps = channel.getMembershipService().getProperties();
+            mcastProps.setProperty("mcastPort", "5555");
+            mcastProps.setProperty("mcastAddress", "224.10.10.10");
+            mcastProps.setProperty("mcastClusterDomain", "catalina");
+            mcastProps.setProperty("bindAddress", "localhost");
+            mcastProps.setProperty("memberDropTime", "20000");
+            mcastProps.setProperty("mcastFrequency", "500");
+            mcastProps.setProperty("tcpListenPort", "4000");
+            mcastProps.setProperty("tcpListenHost", "127.0.0.1");*/
+
 //            TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
 //            tcpFailureDetector.setPrevious(nbc);
 //            channel.addInterceptor(tcpFailureDetector);
@@ -127,11 +131,16 @@
                 channelListener.setContextManager(contextManager);
 
                 Member[] members = channel.getMembers();
+                log.info("Local Tribes Member " + channel.getLocalMember(true).getName());
                 TribesUtil.printMembers(members);
 
                 // If there is at least one member in the Tribe, get the current state from a member
                 Random random = new Random();
                 int numberOfTries = 0; // Don't keep on trying infinitely
+
+                // Keep track of members to whom we already sent a GetStateCommand
+                // Do not send another request to these members
+                List sentMembersList = new ArrayList();
                 while (members.length > 0 &&
                        configurationContext.
                                getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null
@@ -141,9 +150,13 @@
                     try {
                         members = channel.getMembers();
                         int memberIndex = random.nextInt(members.length);
-                        sender.sendToMember(new GetStateCommand(), members[memberIndex]);
-                        log.debug("WAITING FOR STATE UPDATE...");
-                        Thread.sleep(200);
+                        Member member = members[memberIndex];
+                        if (!sentMembersList.contains(member.getName())) {
+                            sender.sendToMember(new GetStateCommand(), member);
+                            sentMembersList.add(member.getName());
+                            log.debug("WAITING FOR STATE UPDATE...");
+                            Thread.sleep(1000);
+                        }
                     } catch (Exception e) {
                         e.printStackTrace();
                         break;

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java Wed Jun  6 07:09:41 2007
@@ -22,7 +22,7 @@
 
 public abstract class ContextClusteringCommand extends ClusteringCommand {
 
-    private String uniqueId;
+    protected String uniqueId;
 
     public abstract void execute(ConfigurationContext configContext) throws ClusteringFault;
 

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/description/AxisOperation.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/description/AxisOperation.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/description/AxisOperation.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/description/AxisOperation.java Wed Jun  6 07:09:41 2007
@@ -418,7 +418,7 @@
      * Further, in the first lookup, it will cache the looked
      * up value so that the subsequent method calls are extremely efficient.
      */
-    public int getAxisSpecifMEPConstant() {
+    public int getAxisSpecificMEPConstant() {
         if (this.mep != WSDLConstants.MEP_CONSTANT_INVALID) {
             return this.mep;
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org