You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2007/06/18 08:59:40 UTC

svn commit: r548236 - in /webservices/axis2/trunk/java/modules: clustering/src/org/apache/axis2/clustering/context/ clustering/src/org/apache/axis2/clustering/tribes/ kernel/src/org/apache/axis2/clustering/ kernel/src/org/apache/axis2/clustering/context/

Author: azeez
Date: Sun Jun 17 23:59:35 2007
New Revision: 548236

URL: http://svn.apache.org/viewvc?view=rev&rev=548236
Log:
Incorportaing a WAIT time to check whether an ACK has arrived, based on the last TTS (time to send). This is a response time improvement.


Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java Sun Jun 17 23:59:35 2007
@@ -116,6 +116,7 @@
         if (!includeAllProperties) {
 
             // Sometimes, there can be failures, so if an exception occurs, we retry
+            // TODO: Need to investigate these failures. Looks like this is becuase the contexts are not synchronized
             while (true) {
                 Map diffs = context.getPropertyDifferences();
                 try {

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java Sun Jun 17 23:59:35 2007
@@ -19,6 +19,7 @@
 import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.ClusteringConstants;
 import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection;
 import org.apache.axis2.clustering.tribes.AckManager;
 import org.apache.axis2.clustering.tribes.ChannelSender;
@@ -147,7 +148,9 @@
                             }
                         } while (sender == null);
                         try {
-                            sender.sendToGroup(cmd);
+                            long tts = sender.sendToGroup(cmd);
+                            configContext.setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
+                                                                   new Long(tts));
                         } catch (ClusteringFault clusteringFault) {
                             throw new RuntimeException(clusteringFault);
                         }
@@ -155,7 +158,9 @@
                 };
                 processorThread.start();
             } else {
-                sender.sendToGroup(cmd);
+                long tts = sender.sendToGroup(cmd);
+                configContext.setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
+                                                       new Long(tts));
             }
         }
     }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java Sun Jun 17 23:59:35 2007
@@ -68,11 +68,15 @@
                     log.debug("[NO ACK] from member " + memberHost);
                     log.debug("ACKed member list=" + memberList);
 
-                    // At this point, resend the original message back to the node which has not
-                    // sent an ACK
-                    sender.sendToMember(ack.getCommand(), member);
+                    // If a new member joined the cluster recently,
+                    // we need to retransmit the message to this member, if an ACK has not been
+                    // received from this member.
+                    if (member.getMemberAliveTime() < 1000) { // TODO: Check
+                        sender.sendToMember(ack.getCommand(), member);
+                        log.debug("Retransimitting msg " + ack.getCommand().getUniqueId() +
+                                  " to member " + memberHost);
+                    }
 
-                    //TODO: Enhancement, Check whether this is a new member. If then send the msg
                     isAcknowledged = false;
                     break;
                 } else {

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Sun Jun 17 23:59:35 2007
@@ -17,6 +17,7 @@
 package org.apache.axis2.clustering.tribes;
 
 import org.apache.axis2.clustering.ClusteringConstants;
+import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.clustering.configuration.ConfigurationClusteringCommand;
 import org.apache.axis2.clustering.configuration.DefaultConfigurationManager;
 import org.apache.axis2.clustering.context.ContextClusteringCommand;
@@ -41,7 +42,7 @@
     private DefaultContextManager contextManager;
     private DefaultConfigurationManager configurationManager;
     private TribesControlCommandProcessor controlCommandProcessor;
-    private ChannelSender sender;
+    private ChannelSender channelSender;
 
     /**
      * The messages received are enqued. Another thread, messageProcessor, will
@@ -64,9 +65,9 @@
         this.configurationManager = configurationManager;
         this.contextManager = contextManager;
         this.controlCommandProcessor = controlCommandProcessor;
-        this.sender = sender;
+        this.channelSender = sender;
         this.configurationContext = configurationContext;
-        startMessageProcessor();
+//        startMessageProcessor();
     }
 
     public void setContextManager(DefaultContextManager contextManager) {
@@ -98,22 +99,28 @@
 
         // Need to process ACKs as soon as they are received since otherwise,
         // unnecessary retransmissions will take place
-        if(msg instanceof AckCommand){
+        /*if (msg instanceof AckCommand) {
             try {
                 controlCommandProcessor.process((AckCommand) msg, sender);
             } catch (Exception e) {
                 log.error(e);
             }
             return;
+        }*/
+
+        try {
+            processMessage(msg,sender);
+        } catch (Exception e) {
+            log.error(e);
         }
 
         // Add the commands to be precessed to the cmdQueue
-        synchronized (cmdQueue) {
+        /*synchronized (cmdQueue) {
             cmdQueue.enqueue(new MemberMessage(msg, sender));
         }
         if (!messageProcessor.isAlive()) {
             startMessageProcessor();
-        }
+        }*/
     }
 
     private void startMessageProcessor() {
@@ -123,6 +130,32 @@
         messageProcessor.start();
     }
 
+
+    private void processMessage(Serializable msg, Member sender) throws ClusteringFault {
+        long start = System.currentTimeMillis();
+        if (msg instanceof ContextClusteringCommand && contextManager != null) {
+            ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg;
+            contextManager.process(ctxCmd);
+
+            // Sending ACKs for ContextClusteringCommandCollection or
+            // UpdateContextCommand is sufficient
+            if (msg instanceof ContextClusteringCommandCollection ||
+                msg instanceof UpdateContextCommand) {
+                AckCommand ackCmd = new AckCommand(ctxCmd.getUniqueId());
+
+                // Send the ACK
+                this.channelSender.sendToMember(ackCmd, sender);
+            }
+        } else if (msg instanceof ConfigurationClusteringCommand &&
+                   configurationManager != null) {
+            configurationManager.process((ConfigurationClusteringCommand) msg);
+        } else if (msg instanceof ControlCommand && controlCommandProcessor != null) {
+            controlCommandProcessor.process((ControlCommand) msg,
+                                            sender);
+        }
+        System.err.println("######### Time to process=" + (System.currentTimeMillis() - start));
+    }
+
     /**
      * A container to hold a message and its sender
      */
@@ -158,28 +191,7 @@
                         Thread.sleep(1);
                         continue;
                     }
-
-                    Serializable msg = memberMessage.getMessage();
-                    if (msg instanceof ContextClusteringCommand && contextManager != null) {
-                        ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg;
-                        contextManager.process(ctxCmd);
-
-                        // Sending ACKs for ContextClusteringCommandCollection or
-                        // UpdateContextCommand is sufficient
-                        if (msg instanceof ContextClusteringCommandCollection ||
-                            msg instanceof UpdateContextCommand) {
-                            AckCommand ackCmd = new AckCommand(ctxCmd.getUniqueId());
-
-                            // Send the ACK
-                            sender.sendToMember(ackCmd, memberMessage.getSender());
-                        }
-                    } else if (msg instanceof ConfigurationClusteringCommand &&
-                               configurationManager != null) {
-                        configurationManager.process((ConfigurationClusteringCommand) msg);
-                    } else if (msg instanceof ControlCommand && controlCommandProcessor != null) {
-                        controlCommandProcessor.process((ControlCommand) msg,
-                                                        memberMessage.getSender());
-                    }
+                    processMessage(memberMessage.getMessage(), memberMessage.getSender());
                 } catch (Throwable e) {
                     log.error("Could not process message ", e);
                 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Sun Jun 17 23:59:35 2007
@@ -28,17 +28,20 @@
     private Log log = LogFactory.getLog(ChannelSender.class);
     private Channel channel;
 
-    public void sendToGroup(ClusteringCommand msg) throws ClusteringFault {
+    public long sendToGroup(ClusteringCommand msg) throws ClusteringFault {
         if (channel == null) {
-            return;
+            return 0;
         }
+        long timeToSend = 0;
 
         // Keep retrying, since at the point of trying to send the msg, a member may leave the group
         // causing a view change. All nodes in a view should get the msg
         while (true) {
             if (channel.getMembers().length > 0) {
                 try {
-                    channel.send(channel.getMembers(), msg, 0); 
+                    long start = System.currentTimeMillis();
+                    channel.send(channel.getMembers(), msg, Channel.SEND_OPTIONS_USE_ACK);
+                    timeToSend = System.currentTimeMillis() - start;
                     log.debug("Sent " + msg + " to group");
                     break;
                 } catch (ChannelException e) {
@@ -50,6 +53,7 @@
                 break;
             }
         }
+        return timeToSend;
     }
 
     public void sendToSelf(ClusteringCommand msg) throws ClusteringFault {
@@ -59,23 +63,27 @@
         try {
             channel.send(new Member[]{channel.getLocalMember(true)},
                          msg,
-                         0);
+                         Channel.SEND_OPTIONS_USE_ACK);
             log.debug("Sent " + msg + " to self");
         } catch (ChannelException e) {
             throw new ClusteringFault(e);
         }
     }
 
-    public void sendToGroup(Throwable throwable) throws ClusteringFault {
+    public long sendToGroup(Throwable throwable) throws ClusteringFault {
         if (channel == null) {
-            return;
+            return 0;
         }
 
+        long timeToSend = 0;
+
         // Keep retrying, since at the point of trying to send the msg, a member may leave the group
         while (true) {
             if (channel.getMembers().length > 0) {
                 try {
-                    channel.send(channel.getMembers(), throwable, 0);
+                    long start = System.currentTimeMillis();
+                    channel.send(channel.getMembers(), throwable, Channel.SEND_OPTIONS_USE_ACK);
+                    timeToSend = System.currentTimeMillis() - start;
                     log.debug("Sent " + throwable + " to group");
                 } catch (ChannelException e) {
                     String message = "Error sending exception message : " + throwable +
@@ -86,12 +94,16 @@
                 break;
             }
         }
+        return timeToSend;
     }
 
-    public void sendToMember(ClusteringCommand cmd, Member member) throws ClusteringFault {
+    public long sendToMember(ClusteringCommand cmd, Member member) throws ClusteringFault {
+        long timeToSend = 0;
         try {
             if (member.isReady()) {
-                channel.send(new Member[]{member}, cmd, 0);
+                long start = System.currentTimeMillis();
+                channel.send(new Member[]{member}, cmd, Channel.SEND_OPTIONS_USE_ACK);
+                timeToSend = System.currentTimeMillis() - start;
                 log.debug("Sent " + cmd + " to " + TribesUtil.getHost(member));
             }
         } catch (ChannelException e) {
@@ -99,6 +111,7 @@
                              ". Reason " + e.getMessage();
             log.warn(message);
         }
+        return timeToSend;
     }
 
     public Channel getChannel() {

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Sun Jun 17 23:59:35 2007
@@ -152,10 +152,13 @@
                         int memberIndex = random.nextInt(members.length);
                         Member member = members[memberIndex];
                         if (!sentMembersList.contains(TribesUtil.getHost(member))) {
-                            sender.sendToMember(new GetStateCommand(), member);
+                            long tts = sender.sendToMember(new GetStateCommand(), member);
+                            configurationContext.
+                                    setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
+                                                             new Long(tts));
                             sentMembersList.add(TribesUtil.getHost(member));
                             log.debug("WAITING FOR STATE UPDATE...");
-                            Thread.sleep(1000);
+                            Thread.sleep(tts + 5);
                         }
                     } catch (Exception e) {
                         e.printStackTrace();

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java Sun Jun 17 23:59:35 2007
@@ -25,4 +25,5 @@
     public static final String AVOID_INITIATION_KEY = "AvoidInitiation";
     public static final String DOMAIN = "domain";
     public static final String CLUSTER_INITIALIZED = "local_cluster.initialized";
+    public static final String TIME_TO_SEND = "local_cluster.time.to.send";
 }

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java Sun Jun 17 23:59:35 2007
@@ -20,9 +20,9 @@
  */
 public interface MessageSender {
 
-    public void sendToGroup(ClusteringCommand msg) throws ClusteringFault;
+    public long sendToGroup(ClusteringCommand msg) throws ClusteringFault;
 
     public void sendToSelf(ClusteringCommand msg) throws ClusteringFault;
 
-    public void sendToGroup(Throwable throwable) throws ClusteringFault;
+    public long sendToGroup(Throwable throwable) throws ClusteringFault;
 }

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java Sun Jun 17 23:59:35 2007
@@ -18,6 +18,7 @@
 
 import org.apache.axis2.clustering.ClusterManager;
 import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.ClusteringConstants;
 import org.apache.axis2.context.AbstractContext;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
@@ -88,7 +89,7 @@
             }
             if (!abstractContext.getPropertyDifferences().isEmpty()) {
                 String msgUUID = contextManager.updateContext(abstractContext);
-                waitForACKs(contextManager, msgUUID);
+                waitForACKs(contextManager, msgUUID, abstractContext.getRootContext());
             }
         } else {
             String msg = "Cannot replicate contexts since " +
@@ -135,7 +136,7 @@
                 String msgUUID =
                         contextManager.updateContexts((AbstractContext[]) contexts.
                                 toArray(new AbstractContext[contexts.size()]));
-                waitForACKs(contextManager, msgUUID);
+                waitForACKs(contextManager, msgUUID, msgContext.getRootContext());
             }
 
         } else {
@@ -146,7 +147,8 @@
     }
 
     private static void waitForACKs(ContextManager contextManager,
-                                    String msgUUID) throws ClusteringFault {
+                                    String msgUUID,
+                                    ConfigurationContext configCtx) throws ClusteringFault {
         long start = System.currentTimeMillis();
 
         // Wait till all members have ACKed receipt & successful processing of
@@ -155,11 +157,17 @@
 
             // Wait sometime before checking whether message is ACKed
             try {
-                Thread.sleep(50);
+                Long tts =
+                        (Long) configCtx.getPropertyNonReplicable(ClusteringConstants.TIME_TO_SEND);
+                if (tts == null) {
+                    Thread.sleep(40);
+                } else if (tts.longValue() >= 0) {
+                    Thread.sleep(tts.longValue() + 5); // Time to recv ACK + time in queue & processing replication request
+                }
             } catch (InterruptedException ignored) {
             }
-            if (System.currentTimeMillis() - start > 40000) {
-                throw new ClusteringFault("ACKs not received from all members within 40 sec. " +
+            if (System.currentTimeMillis() - start > 15000) {
+                throw new ClusteringFault("ACKs not received from all members within 15 sec. " +
                                           "Aborting wait.");
             }
         } while (!contextManager.isMessageAcknowledged(msgUUID));



---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org