You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/13 22:33:05 UTC

svn commit: r385661 - in /tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp: ./ bio/ nio/

Author: fhanik
Date: Mon Mar 13 13:33:03 2006
New Revision: 385661

URL: http://svn.apache.org/viewcvs?rev=385661&view=rev
Log:
Working on simplicity, removing all complex code, synchronization should be simple, but ideally, there should be none, two threads should never try to access the same socket

Added:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
      - copied, changed from r385654, tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
Removed:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java
Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=385661&r1=385660&r2=385661&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Mon Mar 13 13:33:03 2006
@@ -35,7 +35,7 @@
     public boolean isConnected();
     public void setRxBufSize(int size);
     public void setTxBufSize(int size);
-    public boolean checkKeepAlive();
+    public boolean keepalive();
     public void setTimeout(long timeout);
     public void setWaitForAck(boolean isWaitForAck);
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java?rev=385661&r1=385660&r2=385661&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java Mon Mar 13 13:33:03 2006
@@ -52,7 +52,7 @@
     }
     
     public void returnSender(DataSender sender) {
-        sender.checkKeepAlive();
+        sender.keepalive();
         queue.returnSender(sender);
     }
     
@@ -125,7 +125,7 @@
         return poolSize;
     }
 
-    public boolean checkKeepAlive() {
+    public boolean keepalive() {
         //do nothing, the pool checks on every return
         return false;
     }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=385661&r1=385660&r2=385661&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Mon Mar 13 13:33:03 2006
@@ -128,18 +128,8 @@
      */
 
     public void heartbeat() {
-        checkKeepAlive();
+        
     }
-
-    /**
-     * Check all DataSender Socket to close socket at keepAlive mode
-     * @see DataSender#checkKeepAlive()
-     */
-    public void checkKeepAlive() {
-        getTransport().checkKeepAlive();
-    }
-
-    
 
     /**
      * add new cluster member and create sender ( s. replicationMode) transfer

Copied: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java (from r385654, tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java)
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?p2=tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java&p1=tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java&r1=385654&r2=385661&rev=385661&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java Mon Mar 13 13:33:03 2006
@@ -17,15 +17,12 @@
 package org.apache.catalina.tribes.tcp.bio;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.net.SocketException;
 import java.util.Arrays;
 
 import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.io.ClusterData;
 import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.tribes.tcp.Constants;
 import org.apache.catalina.tribes.tcp.DataSender;
@@ -41,9 +38,9 @@
  * @version $Revision: 377484 $ $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $
  * @since 5.5.16
  */
-public class SinglePointDataSender implements DataSender {
+public class BioSender implements DataSender {
 
-    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(SinglePointDataSender.class);
+    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(BioSender.class);
 
     /**
      * The string manager for this package.
@@ -69,11 +66,6 @@
 
     
     /**
-     * cluster domain
-     */
-    private String domain;
-
-    /**
      * current sender socket
      */
     private Socket socket = null;
@@ -84,11 +76,6 @@
     private boolean isSocketConnected = false;
 
     /**
-     * Message transfer over socket ?
-     */
-    private boolean isMessageTransferStarted = false;
-
-    /**
      * sender is in suspect state (last transfer failed)
      */
     private SenderState senderState = new SenderState();
@@ -147,20 +134,19 @@
 
     // ------------------------------------------------------------- Constructor
     
-    public SinglePointDataSender(String domain,InetAddress host, int port) {
+    public BioSender(InetAddress host, int port) {
         this.address = host;
         this.port = port;
-        this.domain = domain;
         if (log.isDebugEnabled())
             log.debug(sm.getString("IDataSender.create",address, new Integer(port)));
     }
 
-    public SinglePointDataSender(String domain,InetAddress host, int port, SenderState state) {
-        this(domain,host,port);
+    public BioSender(InetAddress host, int port, SenderState state) {
+        this(host,port);
         if ( state != null ) this.senderState = state;
     }
-    public SinglePointDataSender(String domain,InetAddress host, int port, SenderState state, int rxBufSize, int txBufSize) {
-        this(domain,host,port,state);
+    public BioSender(InetAddress host, int port, SenderState state, int rxBufSize, int txBufSize) {
+        this(host,port,state);
         this.rxBufSize = rxBufSize;
         this.txBufSize = txBufSize;
     }
@@ -173,9 +159,7 @@
      * <code>&lt;description&gt;/&lt;version&gt;</code>.
      */
     public String getInfo() {
-
         return (info);
-
     }
 
     
@@ -202,39 +186,10 @@
         return port;
     }
 
-    /**
-     * @return Returns the domain.
-     */
-    public String getDomain() {
-        return domain;
-    }
-    
-    /**
-     * @param domain The domain to set.
-     */
-    public void setDomain(String domain) {
-        this.domain = domain;
-    }
-    
     public boolean isConnected() {
         return isSocketConnected;
     }
 
-    /**
-     * @return Is DataSender send a message
-     */
-    public boolean isMessageTransferStarted() {
-        return isMessageTransferStarted;
-    }
-    
-    /**
-     * @param isSocketConnected
-     *            The isSocketConnected to set.
-     */
-    protected void setSocketConnected(boolean isSocketConnected) {
-        this.isSocketConnected = isSocketConnected;
-    }
-
     public boolean isSuspect() {
         return senderState.isSuspect() || senderState.isFailing();
     }
@@ -315,12 +270,6 @@
     public void setResend(boolean resend) {
         this.resend = resend;
     }
-    /**
-     * @return Returns the socket.
-     */
-    public Socket getSocket() {
-        return socket;
-    }
 
     public SenderState getSenderState() {
         return senderState;
@@ -334,13 +283,6 @@
         return txBufSize;
     }
 
-    /**
-     * @param socket The socket to set.
-     */
-    public void setSocket(Socket socket) {
-        this.socket = socket;
-    }
-
     public void setRxBufSize(int rxBufSize) {
         this.rxBufSize = rxBufSize;
     }
@@ -356,15 +298,11 @@
      * @see org.apache.catalina.tribes.tcp.IDataSender#connect()
      */
     public synchronized void connect() throws ChannelException {
-        if(!isMessageTransferStarted) {
+        try {
             openSocket();
-            if(isConnected()) {
-                if (log.isDebugEnabled())
-                    log.debug(sm.getString("IDataSender.connect", address.getHostAddress(),new Integer(port),new Long(0)));
-            }
-        } else 
-            if (log.isWarnEnabled())
-               log.warn(sm.getString("IDataSender.message.create", address.getHostAddress(),new Integer(port)));
+        }catch ( Exception x ) {
+            throw new ChannelException(x);
+        }
    }
 
  
@@ -374,16 +312,12 @@
      * @see IDataSender#disconnect()
      */
     public synchronized void disconnect() {
-        if(!isMessageTransferStarted) {
             boolean connect = isConnected() ;
             closeSocket();
             if(connect) {
                 if (log.isDebugEnabled())
                     log.debug(sm.getString("IDataSender.disconnect", address.getHostAddress(),new Integer(port),new Long(0)));
             }
-        } else 
-            if (log.isWarnEnabled())
-               log.warn(sm.getString("IDataSender.message.disconnect", address.getHostAddress(),new Integer(port)));
         
     }
 
@@ -395,18 +329,15 @@
      * @return true, is socket close
      * @see DataSender#closeSocket()
      */
-    public synchronized boolean checkKeepAlive() {
+    public synchronized boolean keepalive() {
         boolean isCloseSocket = true ;
-        if(!isMessageTransferStarted) {
-            if(isConnected()) {
-                if ((keepAliveTimeout > -1 && (System.currentTimeMillis() - keepAliveConnectTime) > keepAliveTimeout)
-                    || (keepAliveMaxRequestCount > -1 && keepAliveCount >= keepAliveMaxRequestCount)) {
-                        closeSocket();
-               } else
-                    isCloseSocket = false ;
-            }
-        } else
-            isCloseSocket = false ;
+        if(isConnected()) {
+            if ((keepAliveTimeout > -1 && (System.currentTimeMillis() - keepAliveConnectTime) > keepAliveTimeout)
+                || (keepAliveMaxRequestCount > -1 && keepAliveCount >= keepAliveMaxRequestCount)) {
+                    closeSocket();
+           } else
+                isCloseSocket = false ;
+        }
         
         return isCloseSocket;
     }
@@ -417,14 +348,8 @@
      * @see org.apache.catalina.tribes.tcp.IDataSender#sendMessage(,
      *      ChannelMessage)
      */
-    public synchronized void sendMessage(ChannelMessage data) throws ChannelException {
-        try {
-            pushMessage(data);
-        }catch ( Exception x ) {
-            ChannelException cx = new ChannelException(x);
-            cx.addFaultyMember(data.getAddress());
-            throw cx;
-        }
+    public synchronized void sendMessage(byte[] data) throws IOException {
+        pushMessage(data);
     }
 
     
@@ -444,7 +369,7 @@
      * open real socket and set time out when waitForAck is enabled
      * is socket open return directly
      */
-    protected void openSocket() throws ChannelException {
+    protected synchronized void openSocket() throws IOException {
        if(isConnected())
            return ;
        try {
@@ -459,7 +384,7 @@
           getSenderState().setSuspect();
           if (log.isDebugEnabled())
               log.debug(sm.getString("IDataSender.openSocket.failure",address.getHostAddress(), new Integer(port),new Long(0)), ex1);
-          throw new ChannelException(ex1);
+          throw (ex1);
         }
         
      }
@@ -468,7 +393,7 @@
      * @throws IOException
      * @throws SocketException
      */
-    protected void createSocket() throws IOException, SocketException {
+    protected synchronized void createSocket() throws IOException, SocketException {
         socket = new Socket(getAddress(), getPort());
         socket.setSendBufferSize(getTxBufSize());
         socket.setReceiveBufferSize(getRxBufSize());
@@ -481,7 +406,7 @@
      * @see DataSender#disconnect()
      * @see DataSender#closeSocket()
      */
-    protected void closeSocket() {
+    protected synchronized void closeSocket() {
         if(isConnected()) {
              if (socket != null) {
                 try {
@@ -516,28 +441,22 @@
      * @since 5.5.10
      */
     
-    protected void pushMessage(ChannelMessage data, boolean reconnect) throws ChannelException {
-        synchronized(this) {
-            checkKeepAlive();
-            if ( reconnect ) closeSocket();
-            if (!isConnected()) openSocket();
-            else if(keepAliveTimeout > -1) this.keepAliveConnectTime = System.currentTimeMillis();
-        }
-        try {
-            writeData(data);
-        } catch ( IOException x ) {
-            throw new ChannelException(x);
-        }
+    protected synchronized void pushMessage(byte[] data, boolean reconnect) throws IOException {
+        keepalive();
+        if ( reconnect ) closeSocket();
+        if (!isConnected()) openSocket();
+        else if(keepAliveTimeout > -1) this.keepAliveConnectTime = System.currentTimeMillis();
+        writeData(data);
     }
     
-    protected void pushMessage( ChannelMessage data) throws ChannelException {
+    protected synchronized void pushMessage( byte[] data) throws IOException {
         boolean messageTransfered = false ;
-        ChannelException exception = null;
+        IOException exception = null;
         try {
              // first try with existing connection
              pushMessage(data,false);
              messageTransfered = true ;
-        } catch (ChannelException x) {
+        } catch (IOException x) {
             exception = x;
             //resend
             if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(),new Integer(port)),x);
@@ -546,17 +465,17 @@
                 pushMessage(data,true);                    
                 messageTransfered = true;
                 exception = null;
-            } catch (ChannelException xx) {
+            } catch (IOException xx) {
                 exception = xx;
                 closeSocket();
             }
         } finally {
             this.keepAliveCount++;
-            checkKeepAlive();
+            keepalive();
             if(messageTransfered) {
-                if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(),new Integer(port), data.getUniqueId(), new Long(data.getMessage().getLength())));
+                
             } else {
-                if ( exception != null ) throw new ChannelException(exception);
+                if ( exception != null ) throw exception;
             }
         }
     }
@@ -568,19 +487,10 @@
      * @throws IOException
      * @since 5.5.10
      */
-    protected void writeData(ChannelMessage data) throws IOException { 
-        synchronized(this) {
-            isMessageTransferStarted = true ;
-        }
-        try {
-            socket.getOutputStream().write(XByteBuffer.createDataPackage((ClusterData)data));
-            socket.getOutputStream().flush();
-            if (getWaitForAck()) waitForAck();
-        } finally {
-            synchronized(this) {
-                isMessageTransferStarted = false ;
-            }
-        }
+    protected synchronized void writeData(byte[] data) throws IOException { 
+        socket.getOutputStream().write(data);
+        socket.getOutputStream().flush();
+        if (getWaitForAck()) waitForAck();
     }
 
     /**

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java?rev=385661&r1=385660&r2=385661&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java Mon Mar 13 13:33:03 2006
@@ -23,6 +23,7 @@
 import org.apache.catalina.tribes.tcp.DataSender;
 import org.apache.catalina.tribes.tcp.PooledSender;
 import org.apache.catalina.tribes.tcp.SenderState;
+import java.io.IOException;
 
 /**
  * Send cluster messages with a pool of sockets (25).
@@ -158,14 +159,14 @@
      * @param data Message data
      * @throws java.io.IOException
      */
-    public void sendMessage(ChannelMessage data) throws ChannelException {
+    public void sendMessage(byte[] data) throws IOException, ChannelException {
         //get a socket sender from the pool
         if(!isConnected()) {
             synchronized(this) {
                 if(!isConnected()) connect();
             }
         }
-        SinglePointDataSender sender = (SinglePointDataSender)getSender();
+        BioSender sender = (BioSender)getSender();
         if (sender == null) {
             log.warn("Sender queue is empty. Can not send any messages.");
             return;
@@ -187,7 +188,7 @@
 
     public DataSender getNewDataSender() {
         //new DataSender(
-            SinglePointDataSender sender = new SinglePointDataSender(getDomain(),
+            BioSender sender = new BioSender(
                                                getHost(),
                                                getPort(),
                                                getSenderState() );

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java?rev=385661&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java Mon Mar 13 13:33:03 2006
@@ -0,0 +1,194 @@
+package org.apache.catalina.tribes.tcp.bio;
+
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.tcp.MultiPointSender;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import java.nio.channels.Selector;
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.catalina.tribes.tcp.nio.NioSender;
+import java.util.Iterator;
+import org.apache.catalina.tribes.io.ClusterData;
+import java.net.InetAddress;
+import org.apache.catalina.tribes.tcp.SenderState;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class MultipointBioSender implements MultiPointSender {
+    public MultipointBioSender() {
+    }
+    
+    protected long timeout = 15000;
+    protected long selectTimeout = 1000; 
+    protected boolean waitForAck = false;
+    protected int retryAttempts=0;
+    protected int keepAliveCount = Integer.MAX_VALUE;
+    protected HashMap bioSenders = new HashMap();
+    protected boolean directBuf = false;
+    protected int rxBufSize = 43800;
+    protected int txBufSize = 25188;
+    protected boolean suspect = false;
+    private boolean connected;
+    private boolean autoConnect;
+
+    public synchronized void sendMessage(Member[] destination, ChannelMessage msg) throws ChannelException {
+        long start = System.currentTimeMillis();
+        byte[] data = XByteBuffer.createDataPackage((ClusterData)msg);
+        BioSender[] senders = setupForSend(destination);
+        ChannelException cx = null;
+        for ( int i=0; i<senders.length; i++ ) {
+            try {
+                senders[i].sendMessage(data);
+            } catch (Exception x) {
+                if (cx == null) cx = new ChannelException(x);
+                cx.addFaultyMember(destination[i]);
+            }
+        }
+        if (cx!=null ) throw cx;
+    }
+
+
+
+    private BioSender[] setupForSend(Member[] destination) throws ChannelException {
+        ChannelException cx = null;
+        BioSender[] result = new BioSender[destination.length];
+        for ( int i=0; i<destination.length; i++ ) {
+            try {
+                BioSender sender = (BioSender) bioSenders.get(destination[i]);
+                if (sender == null) {
+                    InetAddress dest = InetAddress.getByAddress(destination[i].getHost());
+                    sender = new BioSender(dest, destination[i].getPort(), new SenderState(), rxBufSize, txBufSize);
+                    bioSenders.put(destination[i], sender);
+                }
+                sender.setWaitForAck(waitForAck);
+                result[i] = sender;
+                if (!result[i].isConnected() ) result[i].connect();
+                result[i].keepalive();
+            }catch (Exception x ) {
+                if ( cx== null ) cx = new ChannelException(x);
+                cx.addFaultyMember(destination[i]);
+            }
+        }
+        if ( cx!=null ) throw cx;
+        else return result;
+    }
+
+    public void connect() {
+        //do nothing, we connect on demand
+        setConnected(true);
+    }
+
+
+    private synchronized void close() throws ChannelException  {
+        ChannelException x = null;
+        Object[] members = bioSenders.keySet().toArray();
+        for (int i=0; i<members.length; i++ ) {
+            Member mbr = (Member)members[i];
+            try {
+                NioSender sender = (NioSender)bioSenders.get(mbr);
+                sender.disconnect();
+            }catch ( Exception e ) {
+                if ( x == null ) x = new ChannelException(e);
+                x.addFaultyMember(mbr);
+            }
+            bioSenders.remove(mbr);
+        }
+        if ( x != null ) throw x;
+    }
+
+    public void memberAdded(Member member) {
+
+    }
+
+    public void memberRemoved(Member member) {
+        //disconnect senders
+        NioSender sender = (NioSender)bioSenders.remove(member);
+        if ( sender != null ) sender.disconnect();
+    }
+
+
+    public synchronized void disconnect() {
+        try {close(); }catch (Exception x){}
+        setConnected(false);
+    }
+
+    public void finalize() {
+        try {disconnect(); }catch ( Exception ignore){}
+    }
+
+    public boolean getSuspect() {
+        return suspect;
+    }
+
+    public boolean isConnected() {
+        return connected;
+    }
+
+    public boolean isAutoConnect() {
+        return autoConnect;
+    }
+
+    public void setSuspect(boolean suspect) {
+        this.suspect = suspect;
+    }
+
+    public void setUseDirectBuffer(boolean directBuf) {
+        this.directBuf = directBuf;
+    }
+
+    public void setMaxRetryAttempts(int attempts) {
+        this.retryAttempts = attempts;
+    }
+
+    public void setTxBufSize(int size) {
+        this.txBufSize = size;
+    }
+
+    public void setRxBufSize(int size) {
+        this.rxBufSize = size;
+    }
+
+    public void setWaitForAck(boolean wait) {
+        this.waitForAck = wait;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    public void setConnected(boolean connected) {
+        this.connected = connected;
+    }
+
+    public void setAutoConnect(boolean autoConnect) {
+        this.autoConnect = autoConnect;
+    }
+
+    public boolean keepalive() {
+        //throw new UnsupportedOperationException("Method ParallelNioSender.checkKeepAlive() not implemented");
+        boolean result = false;
+        Map.Entry[] entries = (Map.Entry[])bioSenders.entrySet().toArray(new Map.Entry[bioSenders.size()]);
+        for ( int i=0; i<entries.length; i++ ) {
+            NioSender sender = (NioSender)entries[i].getValue();
+            if ( sender.checkKeepAlive() ) {
+                bioSenders.remove(sender.getDestination());
+            }
+        }
+        return result;
+    }
+
+}
\ No newline at end of file

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=385661&r1=385660&r2=385661&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Mon Mar 13 13:33:03 2006
@@ -273,7 +273,7 @@
         this.autoConnect = autoConnect;
     }
 
-    public boolean checkKeepAlive() {
+    public boolean keepalive() {
         //throw new UnsupportedOperationException("Method ParallelNioSender.checkKeepAlive() not implemented");
         boolean result = false;
         Map.Entry[] entries = (Map.Entry[])nioSenders.entrySet().toArray(new Map.Entry[nioSenders.size()]);



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org