You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/02 21:32:48 UTC
svn commit: r382506 - in
/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp:
./ bio/ nio/
Author: fhanik
Date: Thu Mar 2 12:32:47 2006
New Revision: 382506
URL: http://svn.apache.org/viewcvs?rev=382506&view=rev
Log:
More refactor and cleanup
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=382506&r1=382505&r2=382506&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Thu Mar 2 12:32:47 2006
@@ -15,6 +15,8 @@
*/
package org.apache.catalina.tribes.tcp;
+import org.apache.catalina.tribes.ChannelException;
+
/**
* <p>Title: </p>
*
@@ -28,7 +30,7 @@
* @version 1.0
*/
public interface DataSender {
- public void connect() throws java.io.IOException;
+ public void connect() throws ChannelException;
public void disconnect();
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=382506&r1=382505&r2=382506&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Thu Mar 2 12:32:47 2006
@@ -280,7 +280,7 @@
}
- public void sendMessage(ChannelMessage message, Member destination) throws IOException {
+ public void sendMessage(ChannelMessage message, Member destination) throws ChannelException {
Object key = getKey(destination);
SinglePointSender sender = (SinglePointSender) map.get(key);
sendMessageData(message, sender);
@@ -435,7 +435,7 @@
* @throws java.io.IOException If an error occurs
*/
protected void sendMessageData(ChannelMessage data,
- SinglePointSender sender) throws IOException {
+ SinglePointSender sender) throws ChannelException {
if (sender == null)
throw new RuntimeException("Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
try {
@@ -447,7 +447,7 @@
}
sender.sendMessage(data);
sender.setSuspect(false);
- } catch (IOException x) {
+ } catch (ChannelException x) {
if (!sender.getSuspect()) {
if (log.isErrorEnabled() ) log.error("Unable to send replicated message, is member ["+sender.toString()+"] down?",x);
} else if (log.isDebugEnabled() ) {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java?rev=382506&r1=382505&r2=382506&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java Thu Mar 2 12:32:47 2006
@@ -17,6 +17,7 @@
package org.apache.catalina.tribes.tcp;
import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.ChannelException;
/**
* @author Filip Hanik
@@ -31,7 +32,7 @@
public java.net.InetAddress getAddress();
public void setPort(int port);
public int getPort();
- public void sendMessage(ChannelMessage data) throws java.io.IOException;
+ public void sendMessage(ChannelMessage data) throws ChannelException;
public boolean isConnected();
public void setSuspect(boolean suspect);
public boolean getSuspect();
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java?rev=382506&r1=382505&r2=382506&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java Thu Mar 2 12:32:47 2006
@@ -24,6 +24,7 @@
import org.apache.catalina.tribes.util.LinkObject;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.tcp.*;
+import org.apache.catalina.tribes.ChannelException;
/**
* Send cluster messages from a Message queue with only one socket. Ack and keep
@@ -281,7 +282,7 @@
*
* @see org.apache.catalina.tribes.tcp.IDataSender#connect()
*/
- public void connect() throws java.io.IOException {
+ public void connect() throws ChannelException {
super.connect();
checkThread();
if(!queue.isEnabled())
@@ -314,8 +315,7 @@
*
* @see org.apache.catalina.tribes.tcp.DataSender#pushMessage(ChannelMessage)
*/
- public void sendMessage(ChannelMessage data)
- throws java.io.IOException {
+ public void sendMessage(ChannelMessage data) throws ChannelException {
queue.add(getUniqueIdAsString(data.getUniqueId()), data);
synchronized (this) {
inQueueCounter++;
@@ -328,15 +328,6 @@
data.getMessage().getLength())));
}
- /**
- * Reset sender statistics
- */
- public synchronized void resetStatistics() {
- super.resetStatistics();
- inQueueCounter = queue.getSize();
- outQueueCounter = 0;
- queue.resetStatistics();
- }
/**
* Name of this SockerSender
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java?rev=382506&r1=382505&r2=382506&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java Thu Mar 2 12:32:47 2006
@@ -19,14 +19,13 @@
import java.io.IOException;
import java.net.InetAddress;
import java.util.LinkedList;
+
import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.tcp.*;
+import org.apache.catalina.tribes.ChannelException;
/**
* Send cluster messages with a pool of sockets (25).
*
- * FIXME support processing stats
- *
* @author Filip Hanik
* @author Peter Rossbach
* @version 1.2
@@ -92,17 +91,15 @@
// ----------------------------------------------------- Public Methode
- public synchronized void connect() throws java.io.IOException {
+ public synchronized void connect() throws ChannelException {
//do nothing, happens in the socket sender itself
senderQueue.open();
setSocketConnected(true);
- connectCounter++;
}
public synchronized void disconnect() {
senderQueue.close();
setSocketConnected(false);
- disconnectCounter++;
}
/**
@@ -112,7 +109,7 @@
* @param data Message data
* @throws java.io.IOException
*/
- public void sendMessage(ChannelMessage data) throws IOException {
+ public void sendMessage(ChannelMessage data) throws ChannelException {
//get a socket sender from the pool
if(!isConnected()) {
synchronized(this) {
@@ -132,7 +129,6 @@
//return the connection to the pool
senderQueue.returnSender(sender);
}
- addStats(data.getMessage().getLength());
}
public String toString() {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java?rev=382506&r1=382505&r2=382506&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java Thu Mar 2 12:32:47 2006
@@ -28,6 +28,7 @@
import org.apache.catalina.util.StringManager;
import java.util.Arrays;
import org.apache.catalina.tribes.tcp.*;
+import org.apache.catalina.tribes.ChannelException;
/**
* Send cluster messages with only one socket. Ack and keep Alive Handling is
@@ -97,79 +98,10 @@
private long ackTimeout;
/**
- * number of requests
- */
- protected long nrOfRequests = 0;
-
- /**
- * total bytes to transfer
- */
- protected long totalBytes = 0;
-
- /**
- * number of connects
- */
- protected long connectCounter = 0;
-
- /**
- * number of explizit disconnects
- */
- protected long disconnectCounter = 0;
-
- /**
- * number of failing acks
- */
- protected long missingAckCounter = 0;
-
- /**
- * number of data resends (second trys after socket failure)
- */
- protected long dataResendCounter = 0;
-
- /**
- * number of data failure sends
- */
- protected long dataFailureCounter = 0;
-
- /**
- * doProcessingStats
- */
- protected boolean doProcessingStats = false;
-
- /**
- * proessingTime
- */
- protected long processingTime = 0;
-
- /**
- * min proessingTime
- */
- protected long minProcessingTime = Long.MAX_VALUE ;
-
- /**
- * max proessingTime
- */
- protected long maxProcessingTime = 0;
-
- /**
- * doWaitAckStats
- */
- protected boolean doWaitAckStats = false;
-
- /**
* waitAckTime
*/
protected long waitAckTime = 0;
- /**
- * min waitAckTime
- */
- protected long minWaitAckTime = Long.MAX_VALUE ;
-
- /**
- * max waitAckTime
- */
- protected long maxWaitAckTime = 0;
/**
* keep socket open for no more than one min
@@ -197,21 +129,6 @@
private boolean waitForAck = false;
/**
- * number of socket close
- */
- private int socketCloseCounter = 0 ;
-
- /**
- * number of socket open
- */
- private int socketOpenCounter = 0 ;
-
- /**
- * number of socket open failures
- */
- private int socketOpenFailureCounter = 0 ;
-
- /**
* After failure make a resend
*/
private boolean resend = false ;
@@ -260,167 +177,6 @@
}
- /**
- * @return Returns the nrOfRequests.
- */
- public long getNrOfRequests() {
- return nrOfRequests;
- }
-
- /**
- * @return Returns the totalBytes.
- */
- public long getTotalBytes() {
- return totalBytes;
- }
-
- /**
- * @return Returns the avg totalBytes/nrOfRequests.
- */
- public long getAvgMessageSize() {
- return totalBytes / nrOfRequests;
- }
-
- /**
- * @return Returns the avg processingTime/nrOfRequests.
- */
- public double getAvgProcessingTime() {
- return ((double)processingTime) / nrOfRequests;
- }
-
- /**
- * @return Returns the maxProcessingTime.
- */
- public long getMaxProcessingTime() {
- return maxProcessingTime;
- }
-
- /**
- * @return Returns the minProcessingTime.
- */
- public long getMinProcessingTime() {
- return minProcessingTime;
- }
-
- /**
- * @return Returns the processingTime.
- */
- public long getProcessingTime() {
- return processingTime;
- }
-
- /**
- * @return Returns the doProcessingStats.
- */
- public boolean isDoProcessingStats() {
- return doProcessingStats;
- }
-
- /**
- * @param doProcessingStats The doProcessingStats to set.
- */
- public void setDoProcessingStats(boolean doProcessingStats) {
- this.doProcessingStats = doProcessingStats;
- }
-
-
- /**
- * @return Returns the doWaitAckStats.
- */
- public boolean isDoWaitAckStats() {
- return doWaitAckStats;
- }
-
- /**
- * @param doWaitAckStats The doWaitAckStats to set.
- */
- public void setDoWaitAckStats(boolean doWaitAckStats) {
- this.doWaitAckStats = doWaitAckStats;
- }
-
- /**
- * @return Returns the avg waitAckTime/nrOfRequests.
- */
- public double getAvgWaitAckTime() {
- return ((double)waitAckTime) / nrOfRequests;
- }
-
- /**
- * @return Returns the maxWaitAckTime.
- */
- public long getMaxWaitAckTime() {
- return maxWaitAckTime;
- }
-
- /**
- * @return Returns the minWaitAckTime.
- */
- public long getMinWaitAckTime() {
- return minWaitAckTime;
- }
-
- /**
- * @return Returns the waitAckTime.
- */
- public long getWaitAckTime() {
- return waitAckTime;
- }
-
- /**
- * @return Returns the connectCounter.
- */
- public long getConnectCounter() {
- return connectCounter;
- }
-
- /**
- * @return Returns the disconnectCounter.
- */
- public long getDisconnectCounter() {
- return disconnectCounter;
- }
-
- /**
- * @return Returns the missingAckCounter.
- */
- public long getMissingAckCounter() {
- return missingAckCounter;
- }
-
- /**
- * @return Returns the socketOpenCounter.
- */
- public int getSocketOpenCounter() {
- return socketOpenCounter;
- }
-
- /**
- * @return Returns the socketOpenFailureCounter.
- */
- public int getSocketOpenFailureCounter() {
- return socketOpenFailureCounter;
- }
-
- /**
- * @return Returns the socketCloseCounter.
- */
- public int getSocketCloseCounter() {
- return socketCloseCounter;
- }
-
- /**
- * @return Returns the dataResendCounter.
- */
- public long getDataResendCounter() {
- return dataResendCounter;
- }
-
- /**
- * @return Returns the dataFailureCounter.
- */
- public long getDataFailureCounter() {
- return dataFailureCounter;
- }
/**
* @param address The address to set.
@@ -598,14 +354,12 @@
* Connect other cluster member receiver
* @see org.apache.catalina.tribes.tcp.IDataSender#connect()
*/
- public synchronized void connect() throws java.io.IOException {
+ public synchronized void connect() throws ChannelException {
if(!isMessageTransferStarted) {
openSocket();
if(isConnected()) {
- connectCounter++;
if (log.isDebugEnabled())
- log.debug(sm.getString("IDataSender.connect", address.getHostAddress(),
- new Integer(port),new Long(connectCounter)));
+ log.debug(sm.getString("IDataSender.connect", address.getHostAddress(),new Integer(port),new Long(0)));
}
} else
if (log.isWarnEnabled())
@@ -623,10 +377,8 @@
boolean connect = isConnected() ;
closeSocket();
if(connect) {
- disconnectCounter++;
if (log.isDebugEnabled())
- log.debug(sm.getString("IDataSender.disconnect", address.getHostAddress(),
- new Integer(port),new Long(disconnectCounter)));
+ log.debug(sm.getString("IDataSender.disconnect", address.getHostAddress(),new Integer(port),new Long(0)));
}
} else
if (log.isWarnEnabled())
@@ -664,33 +416,17 @@
* @see org.apache.catalina.tribes.tcp.IDataSender#sendMessage(,
* ChannelMessage)
*/
- public synchronized void sendMessage(ChannelMessage data)
- throws java.io.IOException {
- pushMessage(data);
- }
-
- /**
- * Reset sender statistics
- */
- public synchronized void resetStatistics() {
- nrOfRequests = 0;
- totalBytes = 0;
- disconnectCounter = 0;
- connectCounter = isConnected() ? 1 : 0;
- missingAckCounter = 0;
- dataResendCounter = 0;
- dataFailureCounter = 0 ;
- socketOpenCounter =isConnected() ? 1 : 0;
- socketOpenFailureCounter = 0 ;
- socketCloseCounter = 0;
- processingTime = 0 ;
- minProcessingTime = Long.MAX_VALUE ;
- maxProcessingTime = 0 ;
- waitAckTime = 0 ;
- minWaitAckTime = Long.MAX_VALUE ;
- maxWaitAckTime = 0 ;
+ public synchronized void sendMessage(ChannelMessage data) throws ChannelException {
+ try {
+ pushMessage(data);
+ }catch ( Exception x ) {
+ ChannelException cx = new ChannelException(x);
+ cx.addFaultyMember(data.getAddress());
+ throw cx;
+ }
}
+
/**
* Name of this SockerSender
*/
@@ -705,27 +441,23 @@
/**
* open real socket and set time out when waitForAck is enabled
* is socket open return directly
- * @throws IOException
- * @throws SocketException
*/
- protected void openSocket() throws IOException, SocketException {
+ protected void openSocket() throws ChannelException {
if(isConnected())
return ;
try {
createSocket();
if (getWaitForAck()) socket.setSoTimeout((int) ackTimeout);
isSocketConnected = true;
- socketOpenCounter++;
this.keepAliveCount = 0;
this.keepAliveConnectTime = System.currentTimeMillis();
if (log.isDebugEnabled())
- log.debug(sm.getString("IDataSender.openSocket", address.getHostAddress(), new Integer(port),new Long(socketOpenCounter)));
+ log.debug(sm.getString("IDataSender.openSocket", address.getHostAddress(), new Integer(port),new Long(0)));
} catch (IOException ex1) {
getSenderState().setSuspect();
- socketOpenFailureCounter++ ;
if (log.isDebugEnabled())
- log.debug(sm.getString("IDataSender.openSocket.failure",address.getHostAddress(), new Integer(port),new Long(socketOpenFailureCounter)), ex1);
- throw ex1;
+ log.debug(sm.getString("IDataSender.openSocket.failure",address.getHostAddress(), new Integer(port),new Long(0)), ex1);
+ throw new ChannelException(ex1);
}
}
@@ -736,9 +468,6 @@
*/
protected void createSocket() throws IOException, SocketException {
socket = new Socket(getAddress(), getPort());
-//System.out.println("DEFAULT SOCKET RX="+socket.getReceiveBufferSize() +" our="+getRxBufSize());
-//System.out.println("DEFAULT CHANNEL TX="+socket.getSendBufferSize() +" our="+getTxBufSize());
-//
socket.setSendBufferSize(getTxBufSize());
socket.setReceiveBufferSize(getRxBufSize());
this.socketout = socket.getOutputStream();
@@ -762,56 +491,12 @@
}
this.keepAliveCount = 0;
isSocketConnected = false;
- socketCloseCounter++;
if (log.isDebugEnabled())
- log.debug(sm.getString("IDataSender.closeSocket",address.getHostAddress(), new Integer(port),new Long(socketCloseCounter)));
+ log.debug(sm.getString("IDataSender.closeSocket",address.getHostAddress(), new Integer(port),new Long(0)));
}
}
/**
- * Add statistic for this socket instance
- *
- * @param length
- */
- protected void addStats(int length) {
- nrOfRequests++;
- totalBytes += length;
- if (log.isDebugEnabled() && (nrOfRequests % 1000) == 0) {
- log.debug(sm.getString("IDataSender.stats", new Object[] {
- getAddress().getHostAddress(), new Integer(getPort()),
- new Long(totalBytes), new Long(nrOfRequests),
- new Long(totalBytes / nrOfRequests),
- new Long(getProcessingTime()),
- new Double(getAvgProcessingTime())}));
- }
- }
-
- /**
- * Add processing stats times
- * @param startTime
- */
- protected void addProcessingStats(long startTime) {
- long time = System.currentTimeMillis() - startTime ;
- if(time < minProcessingTime)
- minProcessingTime = time ;
- if( time > maxProcessingTime)
- maxProcessingTime = time ;
- processingTime += time ;
- }
-
- /**
- * Add waitAck stats times
- * @param startTime
- */
- protected void addWaitAckStats(long startTime) {
- long time = System.currentTimeMillis() - startTime ;
- if(time < minWaitAckTime)
- minWaitAckTime = time ;
- if( time > maxWaitAckTime)
- maxWaitAckTime = time ;
- waitAckTime += time ;
- }
- /**
* Push messages with only one socket at a time
* Wait for ack is needed and make auto retry when write message is failed.
* After sending error close and reopen socket again.
@@ -826,54 +511,50 @@
*
* @param data
* data to send
- * @throws java.io.IOException
* @since 5.5.10
*/
- protected void pushMessage(ChannelMessage data, boolean reconnect) throws java.io.IOException {
+ protected void pushMessage(ChannelMessage data, boolean reconnect) throws ChannelException {
synchronized(this) {
checkKeepAlive();
if ( reconnect ) closeSocket();
if (!isConnected()) openSocket();
else if(keepAliveTimeout > -1) this.keepAliveConnectTime = System.currentTimeMillis();
}
- writeData(data);
-
+ try {
+ writeData(data);
+ } catch ( IOException x ) {
+ throw new ChannelException(x);
+ }
}
- protected void pushMessage( ChannelMessage data) throws java.io.IOException {
- long time = 0 ;
- if(doProcessingStats) time = System.currentTimeMillis();
+ protected void pushMessage( ChannelMessage data) throws ChannelException {
boolean messageTransfered = false ;
- IOException exception = null;
+ ChannelException exception = null;
try {
// first try with existing connection
pushMessage(data,false);
messageTransfered = true ;
- } catch (java.io.IOException x) {
+ } catch (ChannelException x) {
exception = x;
//resend
- dataResendCounter++;
if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(),new Integer(port)),x);
try {
// second try with fresh connection
pushMessage(data,true);
messageTransfered = true;
exception = null;
- } catch (IOException xx) {
+ } catch (ChannelException xx) {
exception = xx;
closeSocket();
}
} finally {
this.keepAliveCount++;
checkKeepAlive();
- if(doProcessingStats) addProcessingStats(time);
if(messageTransfered) {
- addStats(data.getMessage().getLength());
if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(),new Integer(port), data.getUniqueId(), new Long(data.getMessage().getLength())));
} else {
- dataFailureCounter++;
- if ( exception != null ) throw exception;
+ if ( exception != null ) throw new ChannelException(exception);
}
}
}
@@ -908,11 +589,6 @@
* @throws java.net.SocketTimeoutException
*/
protected synchronized void waitForAck(long timeout) throws java.io.IOException {
- long time = 0 ;
-
- if(doWaitAckStats) {
- time = System.currentTimeMillis();
- }
try {
boolean ackReceived = false;
ackbuf.clear();
@@ -933,7 +609,6 @@
else throw new IOException(sm.getString("IDataSender.ack.wrong",getAddress(), new Integer(socket.getLocalPort())));
}
} catch (IOException x) {
- missingAckCounter++;
String errmsg = sm.getString("IDataSender.ack.missing", getAddress(),new Integer(socket.getLocalPort()), new Long(this.ackTimeout));
if ( !this.isSuspect() ) {
this.setSuspect(true);
@@ -944,7 +619,6 @@
throw x;
} finally {
ackbuf.clear();
- if(doWaitAckStats) addWaitAckStats(time);
}
}
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=382506&r1=382505&r2=382506&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Thu Mar 2 12:32:47 2006
@@ -27,6 +27,7 @@
import org.apache.catalina.tribes.io.XByteBuffer;
import java.util.Iterator;
import java.nio.channels.SelectionKey;
+import org.apache.catalina.tribes.tcp.MultiPointSender;
/**
* <p>Title: </p>
@@ -40,7 +41,7 @@
* @author not attributable
* @version 1.0
*/
-public class ParallelNioSender {
+public class ParallelNioSender implements MultiPointSender {
protected long timeout = 15000;
protected long selectTimeout = 50;
protected boolean waitForAck = false;
@@ -51,6 +52,8 @@
protected boolean directBuf = false;
protected int rxBufSize = 43800;
protected int txBufSize = 25188;
+ protected boolean suspect = false;
+
public ParallelNioSender(long timeout,
boolean waitForAck,
int retryAttempts,
@@ -89,7 +92,7 @@
throw cx;
}
} catch (Exception x ) {
- try { this.close(); } catch (Exception ignore) {}
+ try { this.disconnect(); } catch (Exception ignore) {}
if ( x instanceof ChannelException ) throw (ChannelException)x;
else throw new ChannelException(x);
}
@@ -184,7 +187,12 @@
return result;
}
- public synchronized void close() throws ChannelException {
+ public void connect() {
+ //do nothing, we connect on demand
+ }
+
+
+ private synchronized void close() throws ChannelException {
ChannelException x = null;
Object[] members = nioSenders.keySet().toArray();
for (int i=0; i<members.length; i++ ) {
@@ -201,8 +209,45 @@
if ( x != null ) throw x;
}
+ public synchronized void disconnect() {
+ try {close(); }catch (Exception x){}
+ }
+
public void finalize() {
- try {close(); }catch ( Exception ignore){}
+ try {disconnect(); }catch ( Exception ignore){}
}
+
+ public boolean getSuspect() {
+ return suspect;
+ }
+ public void setSuspect(boolean suspect) {
+ this.suspect = suspect;
+ }
+
+ public void setUseDirectBuffer(boolean directBuf) {
+ this.directBuf = directBuf;
+ }
+
+ public void setMaxRetryAttempts(int attempts) {
+ this.retryAttempts = attempts;
+ }
+
+ public void setTxBufSize(int size) {
+ this.txBufSize = size;
+ }
+
+ public void setRxBufSize(int size) {
+ this.rxBufSize = size;
+ }
+
+ public void setWaitForAck(boolean wait) {
+ this.waitForAck = wait;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org