You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/03 01:04:22 UTC

svn commit: r382577 - in /tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes: mcast/McastMember.java tcp/ReplicationTransmitter.java tcp/nio/NioSender.java tcp/nio/ParallelNioSender.java tcp/nio/PooledParallelSender.java

Author: fhanik
Date: Thu Mar  2 16:04:20 2006
New Revision: 382577

URL: http://svn.apache.org/viewcvs?rev=382577&view=rev
Log:
First version of the parallel sender is complete

Added:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java
Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java?rev=382577&r1=382576&r2=382577&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java Thu Mar  2 16:04:20 2006
@@ -181,7 +181,6 @@
         
         System.arraycopy(domaind,0,data,20,domaind.length);
         dataPkg = data;
-        System.out.println("McastMember.getData all the way");
         return data;
     }
     /**

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=382577&r1=382576&r2=382577&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Thu Mar  2 16:04:20 2006
@@ -29,6 +29,7 @@
 import org.apache.catalina.tribes.util.IDynamicProperty;
 import org.apache.catalina.util.StringManager;
 import org.apache.tomcat.util.IntrospectionUtils;
+import org.apache.catalina.tribes.tcp.nio.PooledParallelSender;
 
 /**
  * Transmit message to other cluster members
@@ -154,7 +155,7 @@
     /**
      * @return The ack timeout
      */
-    public long getTimeout() {
+    public long getAckTimeout() {
         return ackTimeout;
     }
 
@@ -267,17 +268,38 @@
      * @see org.apache.catalina.tribes.ClusterSender#sendMessage(org.apache.catalina.tribes.ClusterMessage, org.apache.catalina.tribes.Member)
      */
     public void sendMessage(ChannelMessage message, Member[] destination) throws ChannelException {
-        ChannelException exception = null;
-        for (int i = 0; i < destination.length; i++) {
-            try {
-                sendMessage(message, destination[i]);
-            } catch (Exception x) {
-                if (exception == null) exception = new ChannelException(x);
-                exception.addFaultyMember(destination[i]);
+        if ( !isParallel() ) {
+            ChannelException exception = null;
+            for (int i = 0; i < destination.length; i++) {
+                try {
+                    sendMessage(message, destination[i]);
+                } catch (Exception x) {
+                    if (exception == null) exception = new ChannelException(x);
+                    exception.addFaultyMember(destination[i]);
+                }
             }
+            if (exception != null)throw exception;
+        } else {
+            MultiPointSender sender = getParallelSender();
+            sender.sendMessage(destination,message);
         }
-        if (exception != null)throw exception;
+    }
+    
+    PooledParallelSender parallelsender = null;
+    public MultiPointSender getParallelSender() {
+        if ( parallelsender == null ) {
+
 
+            PooledParallelSender sender = new PooledParallelSender();
+            sender.setMaxRetryAttempts(2);
+            sender.setRxBufSize(getRxBufSize());
+            sender.setTimeout(ackTimeout);
+            sender.setUseDirectBuffer(true);
+            sender.setWaitForAck(getWaitForAck());
+            sender.setTxBufSize(getTxBufSize());
+            parallelsender = sender;
+        }
+        return parallelsender;
     }
     
     public void sendMessage(ChannelMessage message, Member destination) throws ChannelException {       
@@ -365,14 +387,16 @@
      */
     public synchronized void add(Member member) {
         try {
-            Object key = getKey(member);
-            if (!map.containsKey(key)) {
-                SinglePointSender sender = DataSenderFactory.getSingleSender(replicationMode, member);
-                if ( sender!= null ) {
-                    transferSenderProperty(sender);
-                    sender.setRxBufSize(getRxBufSize());
-                    sender.setTxBufSize(getTxBufSize());
-                    map.put(key, sender);
+            if ( !isParallel() ) {
+                Object key = getKey(member);
+                if (!map.containsKey(key)) {
+                    SinglePointSender sender = DataSenderFactory.getSingleSender(replicationMode, member);
+                    if (sender != null) {
+                        transferSenderProperty(sender);
+                        sender.setRxBufSize(getRxBufSize());
+                        sender.setTxBufSize(getTxBufSize());
+                        map.put(key, sender);
+                    }
                 }
             }
         } catch (java.io.IOException x) {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=382577&r1=382576&r2=382577&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java Thu Mar  2 16:04:20 2006
@@ -189,7 +189,6 @@
         socketChannel.configureBlocking(false);
         socketChannel.connect(addr);
         socketChannel.register(getSelector(),SelectionKey.OP_CONNECT,this);
-        this.connected = true;
     }
     
 

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=382577&r1=382576&r2=382577&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Thu Mar  2 16:04:20 2006
@@ -77,6 +77,7 @@
         NioSender[] senders = setupForSend(destination);
         connect(senders);
         setData(senders,data);
+        
         int remaining = senders.length;
         try {
             //loop until complete, an error happens, or we timeout
@@ -103,9 +104,11 @@
     private int doLoop(long selectTimeOut, int maxAttempts) throws IOException, ChannelException {
         int completed = 0;
         int selectedKeys = selector.select(selectTimeOut);
+        
         if (selectedKeys == 0) {
             return 0;
         }
+        
         Iterator it = selector.selectedKeys().iterator();
         while (it.hasNext()) {
             SelectionKey sk = (SelectionKey) it.next();
@@ -261,7 +264,8 @@
     }
     
     public boolean checkKeepAlive() {
-        throw new UnsupportedOperationException("Method ParallelNioSender.checkKeepAlive() not implemented");
+        //throw new UnsupportedOperationException("Method ParallelNioSender.checkKeepAlive() not implemented");
+        return false;
     }
 
 }

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java?rev=382577&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java Thu Mar  2 16:04:20 2006
@@ -0,0 +1,78 @@
+package org.apache.catalina.tribes.tcp.nio;
+
+import org.apache.catalina.tribes.tcp.PooledSender;
+import org.apache.catalina.tribes.tcp.DataSender;
+import org.apache.catalina.tribes.tcp.MultiPointSender;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import java.io.IOException;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class PooledParallelSender extends PooledSender implements MultiPointSender{
+    private boolean suspect;
+    private boolean useDirectBuffer;
+    private int maxRetryAttempts;
+
+    public PooledParallelSender() {
+        super(25);
+    }
+    public void sendMessage(Member[] destination, ChannelMessage message) throws ChannelException {
+        ParallelNioSender sender = (ParallelNioSender)getSender();
+        try {
+            sender.sendMessage(destination, message);
+        }finally {
+            returnSender(sender);
+        }
+    }
+
+    public DataSender getNewDataSender() {
+        try {
+            ParallelNioSender sender = 
+                new ParallelNioSender(getTimeout(), 
+                                      getWaitForAck(), 
+                                      getMaxRetryAttempts(), 
+                                      useDirectBuffer,
+                                      getRxBufSize(), 
+                                      getTxBufSize());
+            return sender;
+        } catch ( IOException x ) {
+            throw new IllegalStateException("Unable to open NIO selector.",x);
+        }
+    }
+
+    public void setSuspect(boolean suspect) {
+        this.suspect = suspect;
+    }
+
+    public void setUseDirectBuffer(boolean useDirectBuffer) {
+        this.useDirectBuffer = useDirectBuffer;
+    }
+
+    public void setMaxRetryAttempts(int maxRetryAttempts) {
+        this.maxRetryAttempts = maxRetryAttempts;
+    }
+
+    public boolean getSuspect() {
+        return suspect;
+    }
+
+    public boolean getUseDirectBuffer() {
+        return useDirectBuffer;
+    }
+
+    public int getMaxRetryAttempts() {
+        return maxRetryAttempts;
+    }
+}
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org