You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/01/12 07:24:56 UTC

svn commit: r611371 - in /webservices/axis2/trunk/java/modules: clustering/src/org/apache/axis2/clustering/tribes/ kernel/src/org/apache/axis2/clustering/

Author: azeez
Date: Fri Jan 11 22:24:54 2008
New Revision: 611371

URL: http://svn.apache.org/viewvc?rev=611371&view=rev
Log:
1. Removing unncessary method from ClusterManager
2. Handling SYNC_CK & ASYNC type message sending


Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=611371&r1=611370&r2=611371&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Fri Jan 11 22:24:54 2008
@@ -38,6 +38,12 @@
 
     private Log log = LogFactory.getLog(ChannelSender.class);
     private Channel channel;
+    private boolean synchronizeAllMembers;
+
+    public ChannelSender(Channel channel, boolean synchronizeAllMembers) {
+        this.channel = channel;
+        this.synchronizeAllMembers = synchronizeAllMembers;
+    }
 
     public void sendToGroup(ClusteringCommand msg) throws ClusteringFault {
         if (channel == null) {
@@ -49,8 +55,12 @@
         // causing a view change. All nodes in a view should get the msg
         if (members.length > 0) {
             try {
-                channel.send(members, toByteMessage(msg),
-                             Channel.SEND_OPTIONS_USE_ACK | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+                if (synchronizeAllMembers) {
+                    channel.send(members, toByteMessage(msg),
+                                 Channel.SEND_OPTIONS_USE_ACK | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+                } else {
+                    channel.send(members, toByteMessage(msg), Channel.SEND_OPTIONS_ASYNCHRONOUS);
+                }
                 if (log.isDebugEnabled()) {
                     log.debug("Sent " + msg + " to group");
                 }
@@ -61,13 +71,13 @@
                 throw new ClusteringFault(message, e);
             } catch (ChannelException e) {
                 log.error("Could not send message to some members", e);
-//                ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
-//                for (int i = 0; i < faultyMembers.length; i++) {
-//                    ChannelException.FaultyMember faultyMember = faultyMembers[i];
-//                    Member member = faultyMember.getMember();
-//                    log.error("Member " + TribesUtil.getHost(member) + " is faulty. Cause " +
-//                              faultyMember.getCause(), faultyMember.getCause());
-//                }
+                ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
+                for (int i = 0; i < faultyMembers.length; i++) {
+                    ChannelException.FaultyMember faultyMember = faultyMembers[i];
+                    Member member = faultyMember.getMember();
+                    log.error("Member " + TribesUtil.getHost(member) + " is faulty",
+                              faultyMember.getCause());
+                }
             } catch (Exception e) {
                 String message = "Error sending command message : " + msg +
                                  ". Reason " + e.getMessage();
@@ -117,24 +127,13 @@
             throw new ClusteringFault(message, e);
         } catch (ChannelException e) {
             log.error("Could not send message to " + TribesUtil.getHost(member));
-//            ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
-//            log.error("Member " + TribesUtil.getHost(member) + " is faulty. Cause " +
-//                      faultyMembers[0].getCause(), faultyMembers[0].getCause());
-//            if (!(e.getCause() instanceof java.net.ConnectException)) { // If it is not a connection exception, we try to resend the message
-//                sendToMember(cmd, member);
-//            }
+            ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
+            log.error("Member " + TribesUtil.getHost(member) + " is faulty",
+                      faultyMembers[0].getCause());
         } catch (Exception e) {
             String message = "Could not send message to " + TribesUtil.getHost(member) +
                              ". Reason " + e.getMessage();
             log.warn(message, e);
         }
-    }
-
-    public Channel getChannel() {
-        return channel;
-    }
-
-    public void setChannel(Channel channel) {
-        this.channel = channel;
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=611371&r1=611370&r2=611371&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Fri Jan 11 22:24:54 2008
@@ -123,14 +123,12 @@
             }
         }
 
-        channelSender = new ChannelSender();
-        channelListener = new ChannelListener(configurationContext,
-                                              configurationManager,
-                                              contextManager,
-                                              controlCmdProcessor);
+        channel = new GroupChannel();
+        channelSender = new ChannelSender(channel, synchronizeAllMembers());
+        channelListener = new ChannelListener(configurationContext, configurationManager,
+                                              contextManager, controlCmdProcessor);
 
         controlCmdProcessor.setChannelSender(channelSender);
-        channel = new GroupChannel();
 
         // Set the maximum number of retries, if message sending to a particular node fails
         Parameter maxRetriesParam = getParameter("maxRetries");
@@ -186,7 +184,6 @@
 
 //        OrderInterceptor orderInterceptor = new OrderInterceptor();
 
-
         // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
         AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor(channel);
         channel.addInterceptor(atMostOnceInterceptor);
@@ -215,7 +212,6 @@
         } catch (ChannelException e) {
             throw new ClusteringFault("Error starting Tribes channel", e);
         }
-        channelSender.setChannel(channel);
 
         log.info("Local Tribes Member " + TribesUtil.getLocalHost(channel));
         TribesUtil.printMembers();
@@ -351,6 +347,14 @@
         }
     }
 
+    /**
+     * Method to check whether all members in the cluster have to be kep in sync at all times.
+     * Typically, this will require each member in the cluster to ACKnowledge receipt of a
+     * particular message, which may have a significant performance hit.
+     *
+     * @return true - if all members in the cluster should be kept in sync at all times, false
+     *         otherwise
+     */
     public boolean synchronizeAllMembers() {
         Parameter syncAllParam = getParameter(ClusteringConstants.SYNCHRONIZE_ALL_MEMBERS);
         return syncAllParam == null || Boolean.parseBoolean((String) syncAllParam.getValue());

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java?rev=611371&r1=611370&r2=611371&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java Fri Jan 11 22:24:54 2008
@@ -70,14 +70,4 @@
      */
     void setConfigurationContext(ConfigurationContext configurationContext);
 
-    /**
-     * Method to check whether all members in the cluster have to be kep in sync at all times.
-     * Typically, this will require each member in the cluster to ACKnowledge receipt of a
-     * particular message, which may have a significant performance hit.
-     *
-     * @return true - if all members in the cluster should be kept in sync at all times,
-     *         false otherwise
-     */
-    boolean synchronizeAllMembers();
-
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org