You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/02 21:32:48 UTC

svn commit: r382506 - in /tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp: ./ bio/ nio/

Author: fhanik
Date: Thu Mar  2 12:32:47 2006
New Revision: 382506

URL: http://svn.apache.org/viewcvs?rev=382506&view=rev
Log:
More refactor and cleanup

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=382506&r1=382505&r2=382506&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Thu Mar  2 12:32:47 2006
@@ -15,6 +15,8 @@
  */
 package org.apache.catalina.tribes.tcp;
 
+import org.apache.catalina.tribes.ChannelException;
+
 /**
  * <p>Title: </p>
  *
@@ -28,7 +30,7 @@
  * @version 1.0
  */
 public interface DataSender {
-    public void connect() throws java.io.IOException;
+    public void connect() throws ChannelException;
     public void disconnect();
 
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=382506&r1=382505&r2=382506&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Thu Mar  2 12:32:47 2006
@@ -280,7 +280,7 @@
 
     }
     
-    public void sendMessage(ChannelMessage message, Member destination) throws IOException {       
+    public void sendMessage(ChannelMessage message, Member destination) throws ChannelException {       
         Object key = getKey(destination);
         SinglePointSender sender = (SinglePointSender) map.get(key);
         sendMessageData(message, sender);
@@ -435,7 +435,7 @@
      * @throws java.io.IOException If an error occurs
      */
     protected void sendMessageData(ChannelMessage data,
-                                   SinglePointSender sender) throws IOException {
+                                   SinglePointSender sender) throws ChannelException {
         if (sender == null)
             throw new RuntimeException("Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
         try {
@@ -447,7 +447,7 @@
             }
             sender.sendMessage(data);
             sender.setSuspect(false);
-        } catch (IOException x) {
+        } catch (ChannelException x) {
             if (!sender.getSuspect()) {
                 if (log.isErrorEnabled() ) log.error("Unable to send replicated message, is member ["+sender.toString()+"] down?",x);
             } else if (log.isDebugEnabled() ) {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java?rev=382506&r1=382505&r2=382506&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java Thu Mar  2 12:32:47 2006
@@ -17,6 +17,7 @@
 package org.apache.catalina.tribes.tcp;
 
 import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.ChannelException;
 
 /**
  * @author Filip Hanik
@@ -31,7 +32,7 @@
     public java.net.InetAddress getAddress();
     public void setPort(int port);
     public int getPort();
-    public void sendMessage(ChannelMessage data) throws java.io.IOException;
+    public void sendMessage(ChannelMessage data) throws ChannelException;
     public boolean isConnected();
     public void setSuspect(boolean suspect);
     public boolean getSuspect();

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java?rev=382506&r1=382505&r2=382506&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java Thu Mar  2 12:32:47 2006
@@ -24,6 +24,7 @@
 import org.apache.catalina.tribes.util.LinkObject;
 import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.tribes.tcp.*;
+import org.apache.catalina.tribes.ChannelException;
 
 /**
  * Send cluster messages from a Message queue with only one socket. Ack and keep
@@ -281,7 +282,7 @@
      * 
      * @see org.apache.catalina.tribes.tcp.IDataSender#connect()
      */
-    public void connect() throws java.io.IOException {
+    public void connect() throws ChannelException {
         super.connect();
         checkThread();
         if(!queue.isEnabled())
@@ -314,8 +315,7 @@
      * 
      * @see org.apache.catalina.tribes.tcp.DataSender#pushMessage(ChannelMessage)
      */
-    public void sendMessage(ChannelMessage data)
-            throws java.io.IOException {
+    public void sendMessage(ChannelMessage data) throws ChannelException {
         queue.add(getUniqueIdAsString(data.getUniqueId()), data);
         synchronized (this) {
             inQueueCounter++;
@@ -328,15 +328,6 @@
                             data.getMessage().getLength())));
     }
 
-    /**
-     * Reset sender statistics
-     */
-    public synchronized void resetStatistics() {
-        super.resetStatistics();
-        inQueueCounter = queue.getSize();
-        outQueueCounter = 0;
-        queue.resetStatistics();
-    }
 
     /**
      * Name of this SockerSender

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java?rev=382506&r1=382505&r2=382506&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java Thu Mar  2 12:32:47 2006
@@ -19,14 +19,13 @@
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.LinkedList;
+
 import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.tcp.*;
+import org.apache.catalina.tribes.ChannelException;
 
 /**
  * Send cluster messages with a pool of sockets (25).
  * 
- * FIXME support processing stats
- * 
  * @author Filip Hanik
  * @author Peter Rossbach
  * @version 1.2
@@ -92,17 +91,15 @@
 
     //  ----------------------------------------------------- Public Methode
 
-    public synchronized void connect() throws java.io.IOException {
+    public synchronized void connect() throws ChannelException {
         //do nothing, happens in the socket sender itself
         senderQueue.open();
         setSocketConnected(true);
-        connectCounter++;
     }
 
     public synchronized void disconnect() {
         senderQueue.close();
         setSocketConnected(false);
-        disconnectCounter++;
     }
 
     /**
@@ -112,7 +109,7 @@
      * @param data Message data
      * @throws java.io.IOException
      */
-    public void sendMessage(ChannelMessage data) throws IOException {
+    public void sendMessage(ChannelMessage data) throws ChannelException {
         //get a socket sender from the pool
         if(!isConnected()) {
             synchronized(this) {
@@ -132,7 +129,6 @@
             //return the connection to the pool
             senderQueue.returnSender(sender);
         }
-        addStats(data.getMessage().getLength());
     }
 
     public String toString() {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java?rev=382506&r1=382505&r2=382506&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java Thu Mar  2 12:32:47 2006
@@ -28,6 +28,7 @@
 import org.apache.catalina.util.StringManager;
 import java.util.Arrays;
 import org.apache.catalina.tribes.tcp.*;
+import org.apache.catalina.tribes.ChannelException;
 
 /**
  * Send cluster messages with only one socket. Ack and keep Alive Handling is
@@ -97,79 +98,10 @@
     private long ackTimeout;
 
     /**
-     * number of requests
-     */
-    protected long nrOfRequests = 0;
-
-    /**
-     * total bytes to transfer
-     */
-    protected long totalBytes = 0;
-
-    /**
-     * number of connects
-     */
-    protected long connectCounter = 0;
-
-    /**
-     * number of explizit disconnects
-     */
-    protected long disconnectCounter = 0;
-
-    /**
-     * number of failing acks
-     */
-    protected long missingAckCounter = 0;
-
-    /**
-     * number of data resends (second trys after socket failure)
-     */
-    protected long dataResendCounter = 0;
-
-    /**
-     * number of data failure sends 
-     */
-    protected long dataFailureCounter = 0;
-    
-    /**
-     * doProcessingStats
-     */
-    protected boolean doProcessingStats = false;
-
-    /**
-     * proessingTime
-     */
-    protected long processingTime = 0;
-    
-    /**
-     * min proessingTime
-     */
-    protected long minProcessingTime = Long.MAX_VALUE ;
-
-    /**
-     * max proessingTime
-     */
-    protected long maxProcessingTime = 0;
-   
-    /**
-     * doWaitAckStats
-     */
-    protected boolean doWaitAckStats = false;
-
-    /**
      * waitAckTime
      */
     protected long waitAckTime = 0;
     
-    /**
-     * min waitAckTime
-     */
-    protected long minWaitAckTime = Long.MAX_VALUE ;
-
-    /**
-     * max waitAckTime
-     */
-    protected long maxWaitAckTime = 0;
 
     /**
      * keep socket open for no more than one min
@@ -197,21 +129,6 @@
     private boolean waitForAck = false;
 
     /**
-     * number of socket close
-     */
-    private int socketCloseCounter = 0 ;
-
-    /**
-     * number of socket open
-     */
-    private int socketOpenCounter = 0 ;
-
-    /**
-     * number of socket open failures
-     */
-    private int socketOpenFailureCounter = 0 ;
-
-    /**
      * After failure make a resend
      */
     private boolean resend = false ;
@@ -260,167 +177,6 @@
 
     }
 
-    /**
-     * @return Returns the nrOfRequests.
-     */
-    public long getNrOfRequests() {
-        return nrOfRequests;
-    }
-
-    /**
-     * @return Returns the totalBytes.
-     */
-    public long getTotalBytes() {
-        return totalBytes;
-    }
-
-    /**
-     * @return Returns the avg totalBytes/nrOfRequests.
-     */
-    public long getAvgMessageSize() {
-        return totalBytes / nrOfRequests;
-    }
-
-    /**
-     * @return Returns the avg processingTime/nrOfRequests.
-     */
-    public double getAvgProcessingTime() {
-        return ((double)processingTime) / nrOfRequests;
-    }
- 
-    /**
-     * @return Returns the maxProcessingTime.
-     */
-    public long getMaxProcessingTime() {
-        return maxProcessingTime;
-    }
-    
-    /**
-     * @return Returns the minProcessingTime.
-     */
-    public long getMinProcessingTime() {
-        return minProcessingTime;
-    }
-    
-    /**
-     * @return Returns the processingTime.
-     */
-    public long getProcessingTime() {
-        return processingTime;
-    }
-    
-    /**
-     * @return Returns the doProcessingStats.
-     */
-    public boolean isDoProcessingStats() {
-        return doProcessingStats;
-    }
-    
-    /**
-     * @param doProcessingStats The doProcessingStats to set.
-     */
-    public void setDoProcessingStats(boolean doProcessingStats) {
-        this.doProcessingStats = doProcessingStats;
-    }
- 
- 
-    /**
-     * @return Returns the doWaitAckStats.
-     */
-    public boolean isDoWaitAckStats() {
-        return doWaitAckStats;
-    }
-    
-    /**
-     * @param doWaitAckStats The doWaitAckStats to set.
-     */
-    public void setDoWaitAckStats(boolean doWaitAckStats) {
-        this.doWaitAckStats = doWaitAckStats;
-    }
-    
-    /**
-     * @return Returns the avg waitAckTime/nrOfRequests.
-     */
-    public double getAvgWaitAckTime() {
-        return ((double)waitAckTime) / nrOfRequests;
-    }
- 
-    /**
-     * @return Returns the maxWaitAckTime.
-     */
-    public long getMaxWaitAckTime() {
-        return maxWaitAckTime;
-    }
-    
-    /**
-     * @return Returns the minWaitAckTime.
-     */
-    public long getMinWaitAckTime() {
-        return minWaitAckTime;
-    }
-    
-    /**
-     * @return Returns the waitAckTime.
-     */
-    public long getWaitAckTime() {
-        return waitAckTime;
-    }
-    
-    /**
-     * @return Returns the connectCounter.
-     */
-    public long getConnectCounter() {
-        return connectCounter;
-    }
-
-    /**
-     * @return Returns the disconnectCounter.
-     */
-    public long getDisconnectCounter() {
-        return disconnectCounter;
-    }
-
-    /**
-     * @return Returns the missingAckCounter.
-     */
-    public long getMissingAckCounter() {
-        return missingAckCounter;
-    }
-
-    /**
-     * @return Returns the socketOpenCounter.
-     */
-    public int getSocketOpenCounter() {
-        return socketOpenCounter;
-    }
-    
-    /**
-     * @return Returns the socketOpenFailureCounter.
-     */
-    public int getSocketOpenFailureCounter() {
-        return socketOpenFailureCounter;
-    }
-
-    /**
-     * @return Returns the socketCloseCounter.
-     */
-    public int getSocketCloseCounter() {
-        return socketCloseCounter;
-    }
-
-    /**
-     * @return Returns the dataResendCounter.
-     */
-    public long getDataResendCounter() {
-        return dataResendCounter;
-    }
-
-    /**
-     * @return Returns the dataFailureCounter.
-     */
-    public long getDataFailureCounter() {
-        return dataFailureCounter;
-    }
     
     /**
      * @param address The address to set.
@@ -598,14 +354,12 @@
      * Connect other cluster member receiver 
      * @see org.apache.catalina.tribes.tcp.IDataSender#connect()
      */
-    public synchronized void connect() throws java.io.IOException {
+    public synchronized void connect() throws ChannelException {
         if(!isMessageTransferStarted) {
             openSocket();
             if(isConnected()) {
-                connectCounter++;
                 if (log.isDebugEnabled())
-                    log.debug(sm.getString("IDataSender.connect", address.getHostAddress(),
-                            new Integer(port),new Long(connectCounter)));
+                    log.debug(sm.getString("IDataSender.connect", address.getHostAddress(),new Integer(port),new Long(0)));
             }
         } else 
             if (log.isWarnEnabled())
@@ -623,10 +377,8 @@
             boolean connect = isConnected() ;
             closeSocket();
             if(connect) {
-                disconnectCounter++;
                 if (log.isDebugEnabled())
-                    log.debug(sm.getString("IDataSender.disconnect", address.getHostAddress(),
-                        new Integer(port),new Long(disconnectCounter)));
+                    log.debug(sm.getString("IDataSender.disconnect", address.getHostAddress(),new Integer(port),new Long(0)));
             }
         } else 
             if (log.isWarnEnabled())
@@ -664,33 +416,17 @@
      * @see org.apache.catalina.tribes.tcp.IDataSender#sendMessage(,
      *      ChannelMessage)
      */
-    public synchronized void sendMessage(ChannelMessage data)
-            throws java.io.IOException {
-        pushMessage(data);
-    }
-
-    /**
-     * Reset sender statistics
-     */
-    public synchronized void resetStatistics() {
-        nrOfRequests = 0;
-        totalBytes = 0;
-        disconnectCounter = 0;
-        connectCounter = isConnected() ? 1 : 0;
-        missingAckCounter = 0;
-        dataResendCounter = 0;
-        dataFailureCounter = 0 ;
-        socketOpenCounter =isConnected() ? 1 : 0;
-        socketOpenFailureCounter = 0 ;
-        socketCloseCounter = 0;
-        processingTime = 0 ;
-        minProcessingTime = Long.MAX_VALUE ;
-        maxProcessingTime = 0 ;
-        waitAckTime = 0 ;
-        minWaitAckTime = Long.MAX_VALUE ;
-        maxWaitAckTime = 0 ;
+    public synchronized void sendMessage(ChannelMessage data) throws ChannelException {
+        try {
+            pushMessage(data);
+        }catch ( Exception x ) {
+            ChannelException cx = new ChannelException(x);
+            cx.addFaultyMember(data.getAddress());
+            throw cx;
+        }
     }
 
+    
     /**
      * Name of this SockerSender
      */
@@ -705,27 +441,23 @@
     /**
      * open real socket and set time out when waitForAck is enabled
      * is socket open return directly
-     * @throws IOException
-     * @throws SocketException
      */
-    protected void openSocket() throws IOException, SocketException {
+    protected void openSocket() throws ChannelException {
        if(isConnected())
            return ;
        try {
             createSocket();
             if (getWaitForAck()) socket.setSoTimeout((int) ackTimeout);
             isSocketConnected = true;
-            socketOpenCounter++;
             this.keepAliveCount = 0;
             this.keepAliveConnectTime = System.currentTimeMillis();
             if (log.isDebugEnabled())
-                log.debug(sm.getString("IDataSender.openSocket", address.getHostAddress(), new Integer(port),new Long(socketOpenCounter)));
+                log.debug(sm.getString("IDataSender.openSocket", address.getHostAddress(), new Integer(port),new Long(0)));
       } catch (IOException ex1) {
           getSenderState().setSuspect();
-          socketOpenFailureCounter++ ;
           if (log.isDebugEnabled())
-              log.debug(sm.getString("IDataSender.openSocket.failure",address.getHostAddress(), new Integer(port),new Long(socketOpenFailureCounter)), ex1);
-          throw ex1;
+              log.debug(sm.getString("IDataSender.openSocket.failure",address.getHostAddress(), new Integer(port),new Long(0)), ex1);
+          throw new ChannelException(ex1);
         }
         
      }
@@ -736,9 +468,6 @@
      */
     protected void createSocket() throws IOException, SocketException {
         socket = new Socket(getAddress(), getPort());
-//System.out.println("DEFAULT SOCKET RX="+socket.getReceiveBufferSize() +" our="+getRxBufSize());
-//System.out.println("DEFAULT CHANNEL TX="+socket.getSendBufferSize() +" our="+getTxBufSize());
-//
         socket.setSendBufferSize(getTxBufSize());
         socket.setReceiveBufferSize(getRxBufSize());
         this.socketout = socket.getOutputStream();
@@ -762,56 +491,12 @@
             }
             this.keepAliveCount = 0;
             isSocketConnected = false;
-            socketCloseCounter++;
             if (log.isDebugEnabled())
-                log.debug(sm.getString("IDataSender.closeSocket",address.getHostAddress(), new Integer(port),new Long(socketCloseCounter)));
+                log.debug(sm.getString("IDataSender.closeSocket",address.getHostAddress(), new Integer(port),new Long(0)));
        }
     }
 
     /**
-     * Add statistic for this socket instance
-     * 
-     * @param length
-     */
-    protected void addStats(int length) {
-        nrOfRequests++;
-        totalBytes += length;
-        if (log.isDebugEnabled() && (nrOfRequests % 1000) == 0) {
-            log.debug(sm.getString("IDataSender.stats", new Object[] {
-                    getAddress().getHostAddress(), new Integer(getPort()),
-                    new Long(totalBytes), new Long(nrOfRequests),
-                    new Long(totalBytes / nrOfRequests),
-                    new Long(getProcessingTime()),
-                    new Double(getAvgProcessingTime())}));
-        }
-    }
-
-    /**
-     * Add processing stats times
-     * @param startTime
-     */
-    protected void addProcessingStats(long startTime) {
-        long time = System.currentTimeMillis() - startTime ;
-        if(time < minProcessingTime)
-            minProcessingTime = time ;
-        if( time > maxProcessingTime)
-            maxProcessingTime = time ;
-        processingTime += time ;
-    }
-    
-    /**
-     * Add waitAck stats times
-     * @param startTime
-     */
-    protected void addWaitAckStats(long startTime) {
-        long time = System.currentTimeMillis() - startTime ;
-        if(time < minWaitAckTime)
-            minWaitAckTime = time ;
-        if( time > maxWaitAckTime)
-            maxWaitAckTime = time ;
-        waitAckTime += time ;
-    }
-    /**
      * Push messages with only one socket at a time
      * Wait for ack is needed and make auto retry when write message is failed.
      * After sending error close and reopen socket again.
@@ -826,54 +511,50 @@
      * 
      * @param data
      *            data to send
-     * @throws java.io.IOException
      * @since 5.5.10
      */
     
-    protected void pushMessage(ChannelMessage data, boolean reconnect) throws java.io.IOException {
+    protected void pushMessage(ChannelMessage data, boolean reconnect) throws ChannelException {
         synchronized(this) {
             checkKeepAlive();
             if ( reconnect ) closeSocket();
             if (!isConnected()) openSocket();
             else if(keepAliveTimeout > -1) this.keepAliveConnectTime = System.currentTimeMillis();
         }
-        writeData(data);
-        
+        try {
+            writeData(data);
+        } catch ( IOException x ) {
+            throw new ChannelException(x);
+        }
     }
     
-    protected void pushMessage( ChannelMessage data) throws java.io.IOException {
-        long time = 0 ;
-        if(doProcessingStats) time = System.currentTimeMillis();
+    protected void pushMessage( ChannelMessage data) throws ChannelException {
         boolean messageTransfered = false ;
-        IOException exception = null;
+        ChannelException exception = null;
         try {
              // first try with existing connection
              pushMessage(data,false);
              messageTransfered = true ;
-        } catch (java.io.IOException x) {
+        } catch (ChannelException x) {
             exception = x;
             //resend
-            dataResendCounter++;
             if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(),new Integer(port)),x);
             try {
                 // second try with fresh connection
                 pushMessage(data,true);                    
                 messageTransfered = true;
                 exception = null;
-            } catch (IOException xx) {
+            } catch (ChannelException xx) {
                 exception = xx;
                 closeSocket();
             }
         } finally {
             this.keepAliveCount++;
             checkKeepAlive();
-            if(doProcessingStats) addProcessingStats(time);
             if(messageTransfered) {
-                addStats(data.getMessage().getLength());
                 if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(),new Integer(port), data.getUniqueId(), new Long(data.getMessage().getLength())));
             } else {
-                dataFailureCounter++;
-                if ( exception != null ) throw exception;
+                if ( exception != null ) throw new ChannelException(exception);
             }
         }
     }
@@ -908,11 +589,6 @@
      * @throws java.net.SocketTimeoutException
      */
     protected synchronized void waitForAck(long timeout) throws java.io.IOException {
-        long time = 0 ;
-        
-        if(doWaitAckStats) {
-            time = System.currentTimeMillis();
-        }
         try {
             boolean ackReceived = false;
             ackbuf.clear();
@@ -933,7 +609,6 @@
                 else throw new IOException(sm.getString("IDataSender.ack.wrong",getAddress(), new Integer(socket.getLocalPort())));
             }
         } catch (IOException x) {
-            missingAckCounter++;
             String errmsg = sm.getString("IDataSender.ack.missing", getAddress(),new Integer(socket.getLocalPort()), new Long(this.ackTimeout));
             if ( !this.isSuspect() ) {
                 this.setSuspect(true);
@@ -944,7 +619,6 @@
             throw x;
         } finally {
             ackbuf.clear();
-            if(doWaitAckStats) addWaitAckStats(time);
         }
     }
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=382506&r1=382505&r2=382506&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Thu Mar  2 12:32:47 2006
@@ -27,6 +27,7 @@
 import org.apache.catalina.tribes.io.XByteBuffer;
 import java.util.Iterator;
 import java.nio.channels.SelectionKey;
+import org.apache.catalina.tribes.tcp.MultiPointSender;
 
 /**
  * <p>Title: </p>
@@ -40,7 +41,7 @@
  * @author not attributable
  * @version 1.0
  */
-public class ParallelNioSender {
+public class ParallelNioSender implements MultiPointSender {
     protected long timeout = 15000;
     protected long selectTimeout = 50; 
     protected boolean waitForAck = false;
@@ -51,6 +52,8 @@
     protected boolean directBuf = false;
     protected int rxBufSize = 43800;
     protected int txBufSize = 25188;
+    protected boolean suspect = false;
+    
     public ParallelNioSender(long timeout, 
                              boolean waitForAck,
                              int retryAttempts,
@@ -89,7 +92,7 @@
                 throw cx;
             }
         } catch (Exception x ) {
-            try { this.close(); } catch (Exception ignore) {}
+            try { this.disconnect(); } catch (Exception ignore) {}
             if ( x instanceof ChannelException ) throw (ChannelException)x;
             else throw new ChannelException(x);
         }
@@ -184,7 +187,12 @@
         return result;
     }
     
-    public synchronized void close() throws ChannelException  {
+    public void connect() {
+        //do nothing, we connect on demand
+    }
+    
+    
+    private synchronized void close() throws ChannelException  {
         ChannelException x = null;
         Object[] members = nioSenders.keySet().toArray();
         for (int i=0; i<members.length; i++ ) {
@@ -201,8 +209,45 @@
         if ( x != null ) throw x;
     }
     
+    public synchronized void disconnect() {
+        try {close(); }catch (Exception x){}
+    }
+    
     public void finalize() {
-        try {close(); }catch ( Exception ignore){}
+        try {disconnect(); }catch ( Exception ignore){}
     }
+    
+    public boolean getSuspect() {
+        return suspect;
+    }
+    public void setSuspect(boolean suspect) {
+        this.suspect = suspect;
+    }
+    
+    public void setUseDirectBuffer(boolean directBuf) {
+        this.directBuf = directBuf;
+    }
+    
+    public void setMaxRetryAttempts(int attempts) {
+        this.retryAttempts = attempts;
+    }
+    
+    public void setTxBufSize(int size) {
+        this.txBufSize = size;
+    }
+    
+    public void setRxBufSize(int size) {
+        this.rxBufSize = size;
+    }
+    
+    public void setWaitForAck(boolean wait) {
+        this.waitForAck = wait;
+    }
+    
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+    
+    
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org