You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/14 03:00:08 UTC
svn commit: r385711 - in
/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp:
./ bio/ nio/
Author: fhanik
Date: Mon Mar 13 18:00:05 2006
New Revision: 385711
URL: http://svn.apache.org/viewcvs?rev=385711&view=rev
Log:
Completed more of the interfaces
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java?rev=385711&r1=385710&r2=385711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java Mon Mar 13 18:00:05 2006
@@ -15,14 +15,7 @@
*/
package org.apache.catalina.tribes.tcp;
-import java.io.IOException;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.tcp.DataSender;
-import org.apache.catalina.tribes.tcp.MultiPointSender;
-import org.apache.catalina.tribes.tcp.PooledSender;
/**
* <p>Title: </p>
@@ -41,6 +34,7 @@
protected boolean useDirectBuffer;
protected int maxRetryAttempts;
protected boolean autoConnect;
+ protected int keepAliveCount;
public AbstractPooledSender() {
super();
}
@@ -61,6 +55,10 @@
this.autoConnect = autoConnect;
}
+ public void setKeepAliveCount(int keepAliveCount) {
+ this.keepAliveCount = keepAliveCount;
+ }
+
public boolean getSuspect() {
return suspect;
}
@@ -75,5 +73,9 @@
public boolean isAutoConnect() {
return autoConnect;
+ }
+
+ public int getKeepAliveCount() {
+ return keepAliveCount;
}
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=385711&r1=385710&r2=385711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Mon Mar 13 18:00:05 2006
@@ -15,7 +15,7 @@
*/
package org.apache.catalina.tribes.tcp;
-import org.apache.catalina.tribes.ChannelException;
+import java.io.IOException;
/**
* <p>Title: </p>
@@ -30,7 +30,7 @@
* @version 1.0
*/
public interface DataSender {
- public void connect() throws ChannelException;
+ public void connect() throws IOException;
public void disconnect();
public boolean isConnected();
public void setRxBufSize(int size);
@@ -38,4 +38,5 @@
public boolean keepalive();
public void setTimeout(long timeout);
public void setWaitForAck(boolean isWaitForAck);
+ public void setKeepAliveCount(int maxRequests);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java?rev=385711&r1=385710&r2=385711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java Mon Mar 13 18:00:05 2006
@@ -35,7 +35,7 @@
public void setSuspect(boolean suspect);
public boolean getSuspect();
public void memberAdded(Member member);
- public void memberRemoved(Member member);
+ public void memberDisappeared(Member member);
public void setAutoConnect(boolean auto);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java?rev=385711&r1=385710&r2=385711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java Mon Mar 13 18:00:05 2006
@@ -15,10 +15,9 @@
*/
package org.apache.catalina.tribes.tcp;
+import java.io.IOException;
import java.util.List;
-import org.apache.catalina.tribes.ChannelException;
-
/**
* <p>Title: </p>
*
@@ -31,7 +30,7 @@
* @author not attributable
* @version 1.0
*/
-public abstract class PooledSender implements DataSender {
+public abstract class PooledSender implements MultiPointSender {
private SenderQueue queue = null;
private boolean connected;
@@ -40,6 +39,7 @@
private boolean waitForAck;
private long timeout;
private int poolSize = 25;
+ private boolean suspect;
public PooledSender() {
queue = new SenderQueue(this,poolSize);
@@ -56,7 +56,7 @@
queue.returnSender(sender);
}
- public synchronized void connect() throws ChannelException {
+ public synchronized void connect() throws IOException {
//do nothing, happens in the socket sender itself
queue.open();
setConnected(true);
@@ -99,6 +99,10 @@
public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
queue.setLimit(poolSize);
+ }
+
+ public void setSuspect(Boolean suspect) {
+ this.suspect = suspect;
}
public boolean isConnected() {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=385711&r1=385710&r2=385711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Mon Mar 13 18:00:05 2006
@@ -147,7 +147,7 @@
* @see org.apache.catalina.tribes.ClusterSender#remove(org.apache.catalina.tribes.Member)
*/
public synchronized void remove(Member member) {
- getTransport().memberRemoved(member);
+ getTransport().memberDisappeared(member);
}
// ------------------------------------------------------------- protected
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?rev=385711&r1=385710&r2=385711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java Mon Mar 13 18:00:05 2006
@@ -18,8 +18,8 @@
import java.io.IOException;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.Socket;
-import java.net.SocketException;
import java.util.Arrays;
import org.apache.catalina.tribes.ChannelException;
@@ -28,7 +28,6 @@
import org.apache.catalina.tribes.tcp.DataSender;
import org.apache.catalina.tribes.tcp.SenderState;
import org.apache.catalina.util.StringManager;
-import java.net.InetSocketAddress;
/**
* Send cluster messages with only one socket. Ack and keep Alive Handling is
@@ -100,7 +99,7 @@
/**
* max requests before reconnecting (default -1 unlimited)
*/
- private int keepAliveMaxRequestCount = -1;
+ private int keepAliveCount = -1;
/**
* Last connect timestamp
@@ -110,7 +109,7 @@
/**
* keepalive counter
*/
- protected int keepAliveCount = 0;
+ protected int requestCount = 0;
/**
* wait for receiver Ack
@@ -222,12 +221,12 @@
this.keepAliveTimeout = keepAliveTimeout;
}
- public int getKeepAliveMaxRequestCount() {
- return keepAliveMaxRequestCount;
+ public int getKeepAliveCount() {
+ return keepAliveCount;
}
- public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) {
- this.keepAliveMaxRequestCount = keepAliveMaxRequestCount;
+ public void setKeepAliveCount(int keepAliveMaxRequestCount) {
+ this.keepAliveCount = keepAliveMaxRequestCount;
}
/**
@@ -240,8 +239,8 @@
/**
* @return Returns the keepAliveCount.
*/
- public int getKeepAliveCount() {
- return keepAliveCount;
+ public int getRequestCount() {
+ return requestCount;
}
/**
@@ -298,12 +297,8 @@
* Connect other cluster member receiver
* @see org.apache.catalina.tribes.tcp.IDataSender#connect()
*/
- public void connect() throws ChannelException {
- try {
- openSocket();
- }catch ( Exception x ) {
- throw new ChannelException(x);
- }
+ public void connect() throws IOException {
+ openSocket();
}
@@ -334,7 +329,7 @@
boolean isCloseSocket = true ;
if(isConnected()) {
if ((keepAliveTimeout > -1 && (System.currentTimeMillis() - keepAliveConnectTime) > keepAliveTimeout)
- || (keepAliveMaxRequestCount > -1 && keepAliveCount >= keepAliveMaxRequestCount)) {
+ || (keepAliveCount > -1 && requestCount >= keepAliveCount)) {
closeSocket();
} else
isCloseSocket = false ;
@@ -380,7 +375,7 @@
socket.setReceiveBufferSize(getRxBufSize());
socket.setSoTimeout( (int) timeout);
connected = true;
- this.keepAliveCount = 0;
+ this.requestCount = 0;
this.keepAliveConnectTime = System.currentTimeMillis();
if (log.isDebugEnabled())
log.debug(sm.getString("IDataSender.openSocket", address.getHostAddress(), new Integer(port), new Long(0)));
@@ -409,7 +404,7 @@
socket = null;
}
}
- this.keepAliveCount = 0;
+ this.requestCount = 0;
connected = false;
if (log.isDebugEnabled())
log.debug(sm.getString("IDataSender.closeSocket",address.getHostAddress(), new Integer(port),new Long(0)));
@@ -463,7 +458,7 @@
closeSocket();
}
} finally {
- this.keepAliveCount++;
+ this.requestCount++;
keepalive();
if(messageTransfered) {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java?rev=385711&r1=385710&r2=385711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java Mon Mar 13 18:00:05 2006
@@ -11,6 +11,7 @@
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.tcp.MultiPointSender;
import org.apache.catalina.tribes.tcp.SenderState;
+import java.io.IOException;
/**
* <p>Title: </p>
@@ -59,7 +60,7 @@
- private BioSender[] setupForSend(Member[] destination) throws ChannelException {
+ protected BioSender[] setupForSend(Member[] destination) throws ChannelException {
ChannelException cx = null;
BioSender[] result = new BioSender[destination.length];
for ( int i=0; i<destination.length; i++ ) {
@@ -68,6 +69,10 @@
if (sender == null) {
InetAddress dest = InetAddress.getByAddress(destination[i].getHost());
sender = new BioSender(dest, destination[i].getPort(), new SenderState(), rxBufSize, txBufSize);
+ sender.setKeepAliveCount(keepAliveCount);
+ sender.setTimeout(timeout);
+ //sender.setResend();
+ //sender.setKeepAliveTimeout();
bioSenders.put(destination[i], sender);
}
sender.setWaitForAck(waitForAck);
@@ -83,7 +88,7 @@
else return result;
}
- public void connect() {
+ public void connect() throws IOException {
//do nothing, we connect on demand
setConnected(true);
}
@@ -110,7 +115,7 @@
}
- public void memberRemoved(Member member) {
+ public void memberDisappeared(Member member) {
//disconnect senders
BioSender sender = (BioSender)bioSenders.remove(member);
if ( sender != null ) sender.disconnect();
@@ -172,6 +177,10 @@
public void setAutoConnect(boolean autoConnect) {
this.autoConnect = autoConnect;
+ }
+
+ public void setKeepAliveCount(int keepAliveCount) {
+ this.keepAliveCount = keepAliveCount;
}
public boolean keepalive() {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java?rev=385711&r1=385710&r2=385711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java Mon Mar 13 18:00:05 2006
@@ -2,6 +2,10 @@
import org.apache.catalina.tribes.tcp.DataSender;
import org.apache.catalina.tribes.tcp.PooledSender;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.tcp.MultiPointSender;
+import org.apache.catalina.tribes.ChannelMessage;
/**
* <p>Title: </p>
@@ -26,10 +30,16 @@
protected int txBufSize = 25188;
protected boolean suspect = false;
private boolean autoConnect;
+ private boolean useDirectBuffer;
-
public PooledMultiSender() {
}
+
+ public void sendMessage(Member[] destination, ChannelMessage msg) throws ChannelException {
+ MultiPointSender sender = (MultiPointSender)getSender();
+ sender.sendMessage(destination,msg);
+
+ }
/**
* getNewDataSender
@@ -61,11 +71,32 @@
this.keepAliveCount = keepAliveCount;
}
- public void setRetryAttempts(int retryAttempts) {
+ public void setMaxRetryAttempts(int retryAttempts) {
this.retryAttempts = retryAttempts;
}
public void setSuspect(boolean suspect) {
this.suspect = suspect;
}
+
+ public void setUseDirectBuffer(boolean useDirectBuffer) {
+ this.useDirectBuffer = useDirectBuffer;
+ }
+
+ public boolean getSuspect() {
+ return suspect;
+ }
+
+ public boolean isUseDirectBuffer() {
+ return useDirectBuffer;
+ }
+
+ public void memberAdded(Member member) {
+
+ }
+
+ public void memberDisappeared(Member member) {
+ //disconnect senders
+ }
+
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=385711&r1=385710&r2=385711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java Mon Mar 13 18:00:05 2006
@@ -28,6 +28,7 @@
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.tcp.DataSender;
/**
* This class is NOT thread safe and should never be used with more than one thread at a time
@@ -43,7 +44,7 @@
* @author Filip Hanik
* @version 1.0
*/
-public class NioSender {
+public class NioSender implements DataSender{
protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NioSender.class);
@@ -67,8 +68,10 @@
protected int curPos=0;
protected XByteBuffer ackbuf = new XByteBuffer(128,true);
protected int remaining = 0;
- private boolean complete;
- private int attempt;
+ protected boolean complete;
+ protected int attempt;
+ protected int keepAliveCount;
+ protected long timeout;
public NioSender(Member destination) {
this.destination = destination;
@@ -90,6 +93,7 @@
connected = true;
socketChannel.socket().setSendBufferSize(txBufSize);
socketChannel.socket().setReceiveBufferSize(rxBufSize);
+ socketChannel.socket().setSoTimeout((int)timeout);
if ( current != null ) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
return false;
} else {
@@ -190,6 +194,7 @@
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(addr);
+
socketChannel.register(getSelector(),SelectionKey.OP_CONNECT,this);
}
@@ -321,6 +326,10 @@
return attempt;
}
+ public long getTimeout() {
+ return timeout;
+ }
+
/**
* setRxBufSize
*
@@ -375,5 +384,13 @@
public void setAttempt(int attempt) {
this.attempt = attempt;
+ }
+
+ public void setKeepAliveCount(int keepAliveCount) {
+ this.keepAliveCount = keepAliveCount;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
}
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=385711&r1=385710&r2=385711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Mon Mar 13 18:00:05 2006
@@ -47,7 +47,7 @@
protected long selectTimeout = 1000;
protected boolean waitForAck = false;
protected int retryAttempts=0;
- protected int keepAliveCount = Integer.MAX_VALUE;
+ protected int keepAliveCount = -1;
protected Selector selector;
protected HashMap nioSenders = new HashMap();
protected boolean directBuf = false;
@@ -177,6 +177,8 @@
sender.setRxBufSize(rxBufSize);
sender.setTxBufSize(txBufSize);
sender.setWaitForAck(waitForAck);
+ sender.setTimeout(timeout);
+ sender.setKeepAliveCount(keepAliveCount);
result[i] = sender;
}
return result;
@@ -209,7 +211,7 @@
}
- public void memberRemoved(Member member) {
+ public void memberDisappeared(Member member) {
//disconnect senders
NioSender sender = (NioSender)nioSenders.remove(member);
if ( sender != null ) sender.disconnect();
@@ -271,6 +273,10 @@
public void setAutoConnect(boolean autoConnect) {
this.autoConnect = autoConnect;
+ }
+
+ public void setKeepAliveCount(int keepAliveCount) {
+ this.keepAliveCount = keepAliveCount;
}
public boolean keepalive() {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java?rev=385711&r1=385710&r2=385711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java Mon Mar 13 18:00:05 2006
@@ -69,7 +69,7 @@
}
- public void memberRemoved(Member member) {
+ public void memberDisappeared(Member member) {
//disconnect senders
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org