You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2007/06/08 13:51:08 UTC

svn commit: r545487 - in /webservices/axis2/trunk/java/modules: clustering/src/org/apache/axis2/clustering/context/ clustering/src/org/apache/axis2/clustering/handlers/ clustering/src/org/apache/axis2/clustering/tribes/ kernel/src/org/apache/axis2/clus...

Author: azeez
Date: Fri Jun  8 04:51:07 2007
New Revision: 545487

URL: http://svn.apache.org/viewvc?view=rev&rev=545487
Log:
Skip the replication logic when there is only a single member in the cluster.
Do not keep track of property differences when context replication is disabled or when there are no members in the cluster.


Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java?view=diff&rev=545487&r1=545486&r2=545487
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java Fri Jun  8 04:51:07 2007
@@ -114,38 +114,44 @@
                                        Map excludedPropertyPatterns,
                                        boolean includeAllProperties) {
         if (!includeAllProperties) {
-            Map diffs = context.getPropertyDifferences();
-            synchronized (context) {
-                for (Iterator iter = diffs.keySet().iterator(); iter.hasNext();) {
-                    String key = (String) iter.next();
-                    Object prop = context.getPropertyNonReplicable(key);
-                    if (prop instanceof Serializable) { // First check whether it is serializable
 
-                        // Next check whether it matches an excluded pattern
-                        if (!isExcluded(key, context.getClass().getName(), excludedPropertyPatterns))
-                        {
-                            log.debug("sending property =" + key + "-" + prop);
-                            PropertyDifference diff = (PropertyDifference) diffs.get(key);
-                            diff.setValue(prop);
-                            updateCmd.addProperty(diff);
+            // Sometimes, there can be failures, so if an exception occurs, we retry
+            while (true) {
+                Map diffs = context.getPropertyDifferences();
+                try {
+                    for (Iterator iter = diffs.keySet().iterator(); iter.hasNext();) {
+                        String key = (String) iter.next();
+                        Object prop = context.getPropertyNonReplicable(key);
+
+                        // First check whether it is serializable
+                        if (prop instanceof Serializable) {
+
+                            // Next check whether it matches an excluded pattern
+                            if (!isExcluded(key,
+                                            context.getClass().getName(),
+                                            excludedPropertyPatterns)) {
+                                log.debug("sending property =" + key + "-" + prop);
+                                PropertyDifference diff = (PropertyDifference) diffs.get(key);
+                                diff.setValue(prop);
+                                updateCmd.addProperty(diff);
+                            }
                         }
                     }
+                    break;
+                } catch (Exception ignored) {
                 }
             }
         } else {
-            synchronized (context) {
-                for (Iterator iter = context.getPropertyNames(); iter.hasNext();) {
-                    String key = (String) iter.next();
-                    Object prop = context.getPropertyNonReplicable(key);
-                    if (prop instanceof Serializable) { // First check whether it is serializable
+            for (Iterator iter = context.getPropertyNames(); iter.hasNext();) {
+                String key = (String) iter.next();
+                Object prop = context.getPropertyNonReplicable(key);
+                if (prop instanceof Serializable) { // First check whether it is serializable
 
-                        // Next check whether it matches an excluded pattern
-                        if (!isExcluded(key, context.getClass().getName(), excludedPropertyPatterns))
-                        {
-                            log.debug("sending property =" + key + "-" + prop);
-                            PropertyDifference diff = new PropertyDifference(key, prop, false);
-                            updateCmd.addProperty(diff);
-                        }
+                    // Next check whether it matches an excluded pattern
+                    if (!isExcluded(key, context.getClass().getName(), excludedPropertyPatterns)) {
+                        log.debug("sending property =" + key + "-" + prop);
+                        PropertyDifference diff = new PropertyDifference(key, prop, false);
+                        updateCmd.addProperty(diff);
                     }
                 }
             }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java?view=diff&rev=545487&r1=545486&r2=545487
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java Fri Jun  8 04:51:07 2007
@@ -49,6 +49,14 @@
     }
 
     public void flowComplete(MessageContext msgContext) {
+
+        // If there are no members, we need not do any replication
+        ClusterManager clusterManager =
+                msgContext.getConfigurationContext().getAxisConfiguration().getClusterManager();
+        if(clusterManager != null && clusterManager.getMemberCount() == 0){
+             return;
+        }
+
         int flow = msgContext.getFLOW();
         String mep = msgContext.getAxisOperation().getMessageExchangePattern();
 
@@ -59,7 +67,7 @@
                   mep.equals(WSDL2Constants.MEP_URI_IN_OPTIONAL_OUT) ||
                   mep.equals(WSDL2Constants.MEP_URI_ROBUST_IN_ONLY))
                  && (flow == MessageContext.IN_FLOW || flow == MessageContext.IN_FAULT_FLOW));
-        
+
         boolean replicateOnOutFlow =
                 (mep.equals(WSDL2Constants.MEP_URI_IN_OUT) ||
                  mep.equals(WSDL2Constants.MEP_URI_OUT_ONLY) ||
@@ -69,7 +77,7 @@
                 && (flow == MessageContext.OUT_FLOW || flow == MessageContext.OUT_FAULT_FLOW);
 
         if (replicateOnInFLow || replicateOnOutFlow) {
-            System.err.println("### [FLOW COMPLETE] Going to replicate state. Flow:" + flow);
+            log.debug("### [FLOW COMPLETE] Going to replicate state. Flow:" + flow);
             try {
                 replicateState(msgContext);
             } catch (Exception e) {
@@ -122,7 +130,7 @@
 
                 // Wait till all members have ACKed receipt & successful processing of
                 // the message with UUID 'msgUUID'
-                /*do {
+                do {
                     try {
                         Thread.sleep(50);
                     } catch (InterruptedException e) {
@@ -133,7 +141,7 @@
                         throw new ClusteringFault("ACKs not received from all members within 20 sec. " +
                                                   "Aborting wait.");
                     }
-                } while (!contextManager.isMessageAcknowledged(msgUUID));*/
+                } while (!contextManager.isMessageAcknowledged(msgUUID));
             }
 
         } else {

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java?view=diff&rev=545487&r1=545486&r2=545487
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java Fri Jun  8 04:51:07 2007
@@ -18,6 +18,8 @@
 import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.clustering.context.ContextClusteringCommand;
 import org.apache.catalina.tribes.Member;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.util.Hashtable;
 import java.util.List;
@@ -25,10 +27,10 @@
 import java.util.Vector;
 
 /**
- * 
+ *
  */
 public final class AckManager {
-
+    private static Log log = LogFactory.getLog(AckManager.class);
     private static Map messageAckTable = new Hashtable();
 
     public static void addInitialAcknowledgement(ContextClusteringCommand command) {
@@ -40,7 +42,9 @@
         MessageACK ack = (MessageACK) messageAckTable.get(messageUniqueId);
         if (ack != null) {
             List memberList = ack.getMemberList();
-            memberList.add(memberId);
+            if (!memberList.contains(memberId)) {  // If the member has not already ACKed
+                memberList.add(memberId);
+            }
         }
     }
 
@@ -59,15 +63,15 @@
             for (int i = 0; i < members.length; i++) {
                 Member member = members[i];
                 if (!memberList.contains(member.getName())) {
-                    System.err.println("\n\n");
-                    System.err.println("##### NO ACK from member " + member.getName());
-                    System.err.println("#### ACKed member list=" + memberList);
-                    System.err.println("\n\n");
+                    log.debug("[NO ACK] from member " + member.getName());
+                    log.debug("ACKed member list=" + memberList);
                     // At this point, resend the original message back to the node which has not
                     // sent an ACK
-                    sender.sendToMember(ack.getCommand(), member);
+                    if (member.isReady()) {
+                        sender.sendToMember(ack.getCommand(), member);
+                    }
 
-                    //TODO: Check whether this is a new member. If then send the msg
+                    //TODO: Enhancement, Check whether this is a new member. If then send the msg
                     isAcknowledged = false;
                     break;
                 } else {

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?view=diff&rev=545487&r1=545486&r2=545487
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Fri Jun  8 04:51:07 2007
@@ -31,21 +31,32 @@
     private Channel channel;
 
     public void sendToGroup(ClusteringCommand msg) throws ClusteringFault {
-        if(channel == null) return;
-        Member[] members = channel.getMembers();
-        if (members.length > 0) {
-            try {
-                channel.send(members, msg, Channel.DEFAULT);
-                log.debug("Sent " + msg + " to group");
-            } catch (ChannelException e) {
-                String message = "Error sending command message : " + msg;
-                throw new ClusteringFault(message, e);
+        if (channel == null) {
+            return;
+        }
+
+        // Keep retrying, since at the point of trying to send the msg, a member may leave the group
+        while (true) {
+            if (channel.getMembers().length > 0) {
+                try {
+                    channel.send(channel.getMembers(), msg, Channel.DEFAULT);
+                    log.debug("Sent " + msg + " to group");
+                    break;
+                } catch (ChannelException e) {
+                    String message = "Error sending command message : " + msg +
+                                     ". Reason " + e.getMessage();
+                    log.warn(message);
+                }
+            } else {
+                break;
             }
         }
     }
 
     public void sendToSelf(ClusteringCommand msg) throws ClusteringFault {
-        if(channel == null) return;
+        if (channel == null) {
+            return;
+        }
         try {
             channel.send(new Member[]{channel.getLocalMember(true)},
                          msg,
@@ -57,15 +68,23 @@
     }
 
     public void sendToGroup(Throwable throwable) throws ClusteringFault {
-        if(channel == null) return;
-        Member[] group = channel.getMembers();
-        if (group.length > 0) {
-            try {
-                channel.send(group, throwable, 0);
-                log.debug("Sent " + throwable + " to group");
-            } catch (ChannelException e) {
-                String message = "Error sending exception message : " + throwable;
-                throw new ClusteringFault(message, e);
+        if (channel == null) {
+            return;
+        }
+
+        // Keep retrying, since at the point of trying to send the msg, a member may leave the group
+        while (true) {
+            if (channel.getMembers().length > 0) {
+                try {
+                    channel.send(channel.getMembers(), throwable, 0);
+                    log.debug("Sent " + throwable + " to group");
+                } catch (ChannelException e) {
+                    String message = "Error sending exception message : " + throwable +
+                                     ". Reason " + e.getMessage();
+                    log.warn(message);
+                }
+            } else {
+                break;
             }
         }
     }
@@ -75,7 +94,9 @@
             channel.send(new Member[]{member}, cmd, Channel.DEFAULT);
             log.debug("Sent " + cmd + " to " + member.getName());
         } catch (ChannelException e) {
-            throw new ClusteringFault(e);
+            String message = "Could not send message to " + member.getName() +
+                             ". Reason " + e.getMessage();
+            log.warn(message);
         }
     }
 

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?view=diff&rev=545487&r1=545486&r2=545487
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Fri Jun  8 04:51:07 2007
@@ -239,4 +239,8 @@
             channelListener.setConfigurationContext(configurationContext);
         }
     }
+
+    public int getMemberCount() {
+        return channel.getMembers().length;
+    }
 }

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java?view=diff&rev=545487&r1=545486&r2=545487
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java Fri Jun  8 04:51:07 2007
@@ -66,4 +66,11 @@
      * @param configurationContext
      */
     void setConfigurationContext(ConfigurationContext configurationContext);
+
+    /**
+     * Get the total number of members in the cluster
+     * 
+     * @return The total number of members in the cluster
+     */
+    int getMemberCount();
 }

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java?view=diff&rev=545487&r1=545486&r2=545487
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/context/AbstractContext.java Fri Jun  8 04:51:07 2007
@@ -77,7 +77,7 @@
      * @return Iterator over a collection of keys
      */
     public Iterator getPropertyNames() {
-        if(properties == null){
+        if (properties == null) {
             properties = new HashMap();
         }
         return properties.keySet().iterator();
@@ -97,7 +97,7 @@
 
             // Assume that a property is which is read may be updated.
             // i.e. The object pointed to by 'value' may be modified after it is read
-            propertyDifferences.put(key, new PropertyDifference(key, false));
+            addPropertyDifference(key);
         }
         return obj;
     }
@@ -128,7 +128,18 @@
             this.properties = new HashMap();
         }
         properties.put(key, value);
-        propertyDifferences.put(key, new PropertyDifference(key, false));
+        addPropertyDifference(key);
+    }
+
+    private void addPropertyDifference(String key) {
+        // Add the property differences only if Context replication is enabled,
+        // and there are members in the cluster
+        ClusterManager clusterManager = getRootContext().getAxisConfiguration().getClusterManager();
+        if (clusterManager != null &&
+            clusterManager.getContextManager() != null &&
+            clusterManager.getMemberCount() != 0) {
+            propertyDifferences.put(key, new PropertyDifference(key, false));
+        }
     }
 
     /**



---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org