You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by pe...@apache.org on 2005/02/15 10:31:46 UTC
cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp DataSender.java AsyncSocketSender.java IDataSender.java PooledSocketSender.java ReplicationListener.java SimpleTcpCluster.java SocketSender.java TcpReplicationThread.java
pero 2005/02/15 01:31:46
Modified: modules/cluster/src/share/org/apache/catalina/cluster
ClusterReceiver.java ClusterSender.java
modules/cluster/src/share/org/apache/catalina/cluster/tcp
AsyncSocketSender.java IDataSender.java
PooledSocketSender.java ReplicationListener.java
SimpleTcpCluster.java SocketSender.java
TcpReplicationThread.java
Added: modules/cluster/src/share/org/apache/catalina/cluster/tcp
DataSender.java
Log:
Refactor all IDataSender and factor out a base class
Add some statistics attributes
i18n support to senders
Add KeepAlive and Ack Handling to AsyncSocketSender
Revision Changes Path
1.3 +9 -2 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java
Index: ClusterReceiver.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- ClusterReceiver.java 27 Feb 2004 14:58:55 -0000 1.2
+++ ClusterReceiver.java 15 Feb 2005 09:31:45 -0000 1.3
@@ -17,7 +17,13 @@
package org.apache.catalina.cluster;
-
+/**
+ *
+ * @author Filip Hanik
+ * @author Peter Rossbach
+ * @version 1.1
+ *
+ */
public interface ClusterReceiver
{
@@ -27,7 +33,8 @@
public void setCatalinaCluster(CatalinaCluster cluster);
- public void setIsSenderSynchronized(boolean isSenderSynchronized);
+ public boolean isWaitForAck();
+ public void setWaitForAck(boolean isWaitForAck);
public String getHost();
1.4 +9 -2 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterSender.java
Index: ClusterSender.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterSender.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- ClusterSender.java 27 Dec 2004 09:30:36 -0000 1.3
+++ ClusterSender.java 15 Feb 2005 09:31:45 -0000 1.4
@@ -19,7 +19,13 @@
import org.apache.catalina.cluster.tcp.SimpleTcpCluster;
-
+/**
+ *
+ * @author Filip Hanik
+ * @author Peter Rossbach
+ * @version 1.1
+ *
+ */
public interface ClusterSender
{
@@ -35,7 +41,8 @@
public void sendMessage(String messageId, byte[] indata) throws java.io.IOException;
- public boolean getIsSenderSynchronized();
+ public boolean isWaitForAck();
+ public void setWaitForAck(boolean isWaitForAck);
/**
* @param cluster
1.10 +157 -105 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java
Index: AsyncSocketSender.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- AsyncSocketSender.java 27 Dec 2004 09:30:36 -0000 1.9
+++ AsyncSocketSender.java 15 Feb 2005 09:31:45 -0000 1.10
@@ -17,173 +17,215 @@
package org.apache.catalina.cluster.tcp;
import java.net.InetAddress;
-import java.net.Socket;
+
import org.apache.catalina.cluster.util.SmartQueue;
/**
- * Send cluster messages from a Message queue with only one socket.
+ * Send cluster messages from a Message queue with only one socket. Ack and keep
+ * Alive Handling is supported.
+ * <ul>
+ * <li>With autoConnect=false at ReplicationTransmitter, you can disconnect the
+ * sender and all messages are queued. Only use this for small maintaince
+ * isuses!</li>
+ * <li>waitForAck=true, means that receiver ack the transfer</li>
+ * <li>after one minute idle time, or number of request (100) the connection is
+ * reconnected with next request. Change this for production use!</li>
+ * <li>default ackTimeout is 15 sec: this is very low for big all session replication messages after restart a node</li>
+ * <li>disable keepAlive: keepAliveTimeout="-1" and keepAliveMaxRequestCount="-1"</li>
+ * </ul>
*
* @author Filip Hanik
* @author Peter Rossbach
- * @version 1.1
+ * @version 1.2
*/
-public class AsyncSocketSender implements IDataSender {
+public class AsyncSocketSender extends DataSender {
+
private static int threadCounter = 1;
private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
.getLog(AsyncSocketSender.class);
- private InetAddress address;
-
- private int port;
-
- private Socket sc = null;
+ /**
+ * The descriptive information about this implementation.
+ */
+ private static final String info = "AsyncSocketSender/1.2";
- private boolean isSocketConnected = false;
+ // ----------------------------------------------------- Instance Variables
+ /**
+ * Message Queue
+ */
private SmartQueue queue = new SmartQueue();
- private boolean suspect;
-
+ /**
+ * Active thread to push messages asynchronous to the other replication node
+ */
private QueueThread queueThread = null;
- private long ackTimeout;
-
- private long nrOfRequests = 0;
-
- private long totalBytes = 0;
-
- private synchronized void addStats(int length) {
- nrOfRequests++;
- totalBytes += length;
- if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
- log.debug("Send stats from " + getAddress().getHostAddress() + ":" + getPort()
- + "Nr of bytes sent=" + totalBytes + " over "
- + nrOfRequests + " ==" + (totalBytes / nrOfRequests)
- + " bytes/request");
- }
-
- }
+ /**
+ * Count number of queue message
+ */
+ private long inQueueCounter = 0;
/**
- * @return Returns the nrOfRequests.
+ * Count all successfull push messages from queue
*/
- public long getNrOfRequests() {
- return nrOfRequests;
- }
+ private long outQueueCounter = 0;
/**
- * @return Returns the totalBytes.
+ * Current number of bytes from all queued messages
*/
- public long getTotalBytes() {
- return totalBytes;
- }
+ private long queuedNrOfBytes = 0;
+ // ------------------------------------------------------------- Constructor
+
+ /**
+ * start background thread to push incomming cluster messages to replication
+ * node
+ *
+ * @param host replication node tcp address
+ * @param port replication node tcp port
+ */
public AsyncSocketSender(InetAddress host, int port) {
- this.address = host;
- this.port = port;
+ super(host, port);
checkThread();
- if (log.isInfoEnabled())
- log.info("Started async sender thread for TCP replication.");
+ long a = Long.MAX_VALUE;
}
- public InetAddress getAddress() {
- return address;
- }
+ // ------------------------------------------------------------- Properties
- public int getPort() {
- return port;
- }
+ /**
+ * Return descriptive information about this implementation and the
+ * corresponding version number, in the format
+ * <code><description>/<version></code>.
+ */
+ public String getInfo() {
- public void connect() throws java.io.IOException {
- sc = new Socket(getAddress(), getPort());
- isSocketConnected = true;
- checkThread();
+ return (info);
}
- protected void checkThread() {
- if (queueThread == null) {
- queueThread = new QueueThread(this);
- queueThread.setDaemon(true);
- queueThread.start();
- }
+ /**
+ * @return Returns the inQueueCounter.
+ */
+ public long getInQueueCounter() {
+ return inQueueCounter;
}
- public void disconnect() {
- try {
- sc.close();
- } catch (Exception x) {
- }
- isSocketConnected = false;
- if (queueThread != null) {
- queueThread.stopRunning();
- queueThread = null;
- }
+ /**
+ * @return Returns the outQueueCounter.
+ */
+ public long getOutQueueCounter() {
+ return outQueueCounter;
+ }
+ /**
+ * @return Returns the queueSize.
+ */
+ public int getQueueSize() {
+ return queue.size();
}
- public boolean isConnected() {
- return isSocketConnected;
+ /**
+ * @return Returns the queuedNrOfBytes.
+ */
+ public long getQueuedNrOfBytes() {
+ return queuedNrOfBytes;
}
- public int getQueueSize() {
- return queue.size();
+ // --------------------------------------------------------- Public Methods
+
+ /*
+ * Connect to socket and start background thread to ppush queued messages
+ *
+ * @see org.apache.catalina.cluster.tcp.IDataSender#connect()
+ */
+ public void connect() throws java.io.IOException {
+ super.connect();
+ checkThread();
}
/**
- * Blocking send
+ * Disconnect socket ad stop queue thread
*
- * @param data
- * @throws java.io.IOException
+ * @see org.apache.catalina.cluster.tcp.IDataSender#disconnect()
*/
- private synchronized void sendMessage(byte[] data)
- throws java.io.IOException {
- if (!isConnected())
- connect();
- try {
- sc.getOutputStream().write(data);
- sc.getOutputStream().flush();
- } catch (java.io.IOException x) {
- disconnect();
- connect();
- sc.getOutputStream().write(data);
- sc.getOutputStream().flush();
- }
- addStats(data.length);
+ public void disconnect() {
+ stopThread();
+ super.disconnect();
}
- public synchronized void sendMessage(String sessionId, byte[] data)
+ /*
+ * Send message to queue for later sending
+ *
+ * @see org.apache.catalina.cluster.tcp.IDataSender#sendMessage(java.lang.String,
+ * byte[])
+ */
+ public synchronized void sendMessage(String messageid, byte[] data)
throws java.io.IOException {
- SmartQueue.SmartEntry entry = new SmartQueue.SmartEntry(sessionId, data);
+ SmartQueue.SmartEntry entry = new SmartQueue.SmartEntry(messageid, data);
queue.add(entry);
+ inQueueCounter++;
+ queuedNrOfBytes += data.length;
+ if (log.isTraceEnabled())
+ log.trace(sm.getString("AsyncSocketSender.queue.message",
+ getAddress(), new Integer(getPort()), messageid, new Long(
+ data.length)));
+ }
+
+ /*
+ * Reset sender statistics
+ */
+ public synchronized void resetStatistics() {
+ super.resetStatistics();
+ inQueueCounter = queue.size();
+ outQueueCounter = 0;
+
}
+ /**
+ * Name of this SockerSender
+ */
public String toString() {
StringBuffer buf = new StringBuffer("AsyncSocketSender[");
buf.append(getAddress()).append(":").append(getPort()).append("]");
return buf.toString();
}
- public boolean isSuspect() {
- return suspect;
- }
+ // --------------------------------------------------------- Public Methods
- public boolean getSuspect() {
- return suspect;
+ /**
+ * Start Queue thread as daemon
+ */
+ protected void checkThread() {
+ if (queueThread == null) {
+ if (log.isInfoEnabled())
+ log.info(sm.getString("AsyncSocketSender.create.thread",
+ getAddress(), new Integer(getPort())));
+ queueThread = new QueueThread(this);
+ queueThread.setDaemon(true);
+ queueThread.start();
+ }
}
- public void setSuspect(boolean suspect) {
- this.suspect = suspect;
+ /**
+ * stop queue worker thread
+ */
+ protected void stopThread() {
+ if (queueThread != null) {
+ queueThread.stopRunning();
+ queueThread = null;
+ }
}
- public long getAckTimeout() {
- return ackTimeout;
+ /*
+ * Reduce queued message date size counter
+ */
+ protected void reduceQueuedCounter(int size) {
+ queuedNrOfBytes -= size;
}
- public void setAckTimeout(long ackTimeout) {
- this.ackTimeout = ackTimeout;
- }
+ // -------------------------------------------------------- Inner Class
private class QueueThread extends Thread {
AsyncSocketSender sender;
@@ -199,16 +241,26 @@
keepRunning = false;
}
+ /**
+ * Get one queued message and push it to the replication node
+ *
+ * @see DataSender#pushMessage(String, byte[])
+ */
public void run() {
while (keepRunning) {
SmartQueue.SmartEntry entry = sender.queue.remove(5000);
if (entry != null) {
+ int messagesize = 0;
try {
byte[] data = (byte[]) entry.getValue();
- sender.sendMessage(data);
+ messagesize = data.length;
+ sender.pushMessage((String) entry.getKey(), data);
+ outQueueCounter++;
} catch (Exception x) {
- log.warn("Unable to asynchronously send session w/ id="
- + entry.getKey() + " message will be ignored.");
+ log.warn(sm.getString("AsyncSocketSender.send.error",
+ entry.getKey()));
+ } finally {
+ reduceQueuedCounter(messagesize);
}
}
}
1.6 +4 -0 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSender.java
Index: IDataSender.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSender.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- IDataSender.java 29 Sep 2004 18:23:55 -0000 1.5
+++ IDataSender.java 15 Feb 2005 09:31:45 -0000 1.6
@@ -36,4 +36,8 @@
public void setSuspect(boolean suspect);
public boolean getSuspect();
public void setAckTimeout(long timeout);
+ public long getAckTimeout();
+ public boolean isWaitForAck();
+ public void setWaitForAck(boolean isWaitForAck);
+
}
1.9 +117 -133 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java
Index: PooledSocketSender.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- PooledSocketSender.java 27 Dec 2004 09:30:36 -0000 1.8
+++ PooledSocketSender.java 15 Feb 2005 09:31:45 -0000 1.9
@@ -15,8 +15,9 @@
*/
package org.apache.catalina.cluster.tcp;
-import java.net.InetAddress ;
-import java.net.Socket;
+
+import java.io.IOException;
+import java.net.InetAddress;
import java.util.LinkedList;
/**
@@ -24,129 +25,99 @@
*
* @author Filip Hanik
* @author Peter Rossbach
- * @version 1.1
+ * @version 1.2
*/
+public class PooledSocketSender extends DataSender {
+
+ private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
+ .getLog(org.apache.catalina.cluster.tcp.PooledSocketSender.class);
-public class PooledSocketSender implements IDataSender
-{
+ /**
+ * The descriptive information about this implementation.
+ */
+ private static final String info = "PooledSocketSender/1.2";
- private static org.apache.commons.logging.Log log =
- org.apache.commons.logging.LogFactory.getLog( org.apache.catalina.cluster.CatalinaCluster.class );
+ // ----------------------------------------------------- Instance Variables
- private InetAddress address;
- private int port;
- private Socket sc = null;
- private boolean isSocketConnected = true;
- private boolean suspect;
- private long ackTimeout = 15*1000; //15 seconds socket read timeout (for acknowledgement)
- private long keepAliveTimeout = 60*1000; //keep socket open for no more than one min
- private int keepAliveMaxRequestCount = 100; //max 100 requests before reconnecting
- private long keepAliveConnectTime = 0;
- private int keepAliveCount = 0;
private int maxPoolSocketLimit = 25;
private SenderQueue senderQueue = null;
- private long nrOfRequests = 0;
- private long totalBytes = 0;
-
- public PooledSocketSender(InetAddress host, int port)
- {
- this.address = host;
- this.port = port;
- senderQueue = new SenderQueue(this,maxPoolSocketLimit);
- }
-
- private synchronized void addStats(int length) {
- nrOfRequests++;
- totalBytes += length;
- if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
- log.debug("Send stats from " + getAddress().getHostAddress() + ":" + getPort()
- + "Nr of bytes sent=" + totalBytes + " over "
- + nrOfRequests + " ==" + (totalBytes / nrOfRequests)
- + " bytes/request");
- }
+ // ----------------------------------------------------- Constructor
+ public PooledSocketSender(InetAddress host, int port) {
+ super(host, port);
+ senderQueue = new SenderQueue(this, maxPoolSocketLimit);
}
- /**
- * @return Returns the nrOfRequests.
- */
- public long getNrOfRequests() {
- return nrOfRequests;
- }
+ // ----------------------------------------------------- Public Properties
/**
- * @return Returns the totalBytes.
+ * Return descriptive information about this implementation and the
+ * corresponding version number, in the format
+ * <code><description>/<version></code>.
*/
- public long getTotalBytes() {
- return totalBytes;
- }
+ public String getInfo() {
+ return (info);
- public InetAddress getAddress()
- {
- return address;
- }
-
- public int getPort()
- {
- return port;
}
- public void connect() throws java.io.IOException
- {
- //do nothing, happens in the socket sender itself
- senderQueue.open();
- isSocketConnected = true;
+ public void setMaxPoolSocketLimit(int limit) {
+ maxPoolSocketLimit = limit;
+ senderQueue.setLimit(limit);
}
- public void disconnect()
- {
- senderQueue.close();
- isSocketConnected = false;
+ public int getMaxPoolSocketLimit() {
+ return maxPoolSocketLimit;
}
- public boolean isConnected()
- {
- return isSocketConnected;
+ public int getInPoolSize() {
+ return senderQueue.getInPoolSize();
}
- public void setAckTimeout(long timeout) {
- this.ackTimeout = timeout;
+ public int getInUsePoolSize() {
+ return senderQueue.getInUsePoolSize();
}
- public long getAckTimeout() {
- return ackTimeout;
- }
+ // ----------------------------------------------------- Public Methode
- public void setMaxPoolSocketLimit(int limit) {
- maxPoolSocketLimit = limit;
+ public void connect() throws java.io.IOException {
+ //do nothing, happens in the socket sender itself
+ senderQueue.open();
+ setSocketConnected(true);
+ connectCounter++;
}
- public int getMaxPoolSocketLimit() {
- return maxPoolSocketLimit;
+ public void disconnect() {
+ senderQueue.close();
+ setSocketConnected(false);
+ disconnectCounter++;
}
-
/**
- * Blocking send
- * @param data
+ * send Message and use a pool of SocketSenders
+ *
+ * @param messageId Message unique identifier
+ * @param data Message data
* @throws java.io.IOException
*/
- public void sendMessage(String sessionId, byte[] data) throws java.io.IOException
- {
+ public void sendMessage(String messageId, byte[] data) throws IOException {
//get a socket sender from the pool
SocketSender sender = senderQueue.getSender(0);
- if ( sender == null ) {
- log.warn("No socket sender available for client="+this.getAddress()+":"+this.getPort()+" did it disappear?");
+ if (sender == null) {
+ log.warn(sm.getString("PoolSocketSender.noMoreSender", this
+ .getAddress(), new Integer(this.getPort())));
return;
- }//end if
+ }
//send the message
- sender.sendMessage(sessionId,data);
- //return the connection to the pool
- senderQueue.returnSender(sender);
+ try {
+ sender.sendMessage(messageId, data);
+ } finally {
+ //return the connection to the pool
+ senderQueue.returnSender(sender);
+ }
addStats(data.length);
}
@@ -156,46 +127,19 @@
return buf.toString();
}
- public boolean getSuspect() {
- return suspect;
- }
-
- public void setSuspect(boolean suspect) {
- this.suspect = suspect;
- }
-
- public long getKeepAliveTimeout() {
- return keepAliveTimeout;
- }
- public void setKeepAliveTimeout(long keepAliveTimeout) {
- this.keepAliveTimeout = keepAliveTimeout;
- }
- public int getKeepAliveMaxRequestCount() {
- return keepAliveMaxRequestCount;
- }
- public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) {
- this.keepAliveMaxRequestCount = keepAliveMaxRequestCount;
- }
-
- /**
- * @return Returns the keepAliveConnectTime.
- */
- public long getKeepAliveConnectTime() {
- return keepAliveConnectTime;
- }
- /**
- * @return Returns the keepAliveCount.
- */
- public int getKeepAliveCount() {
- return keepAliveCount;
- }
+ // ----------------------------------------------------- Inner Class
private class SenderQueue {
private int limit = 25;
+
PooledSocketSender parent = null;
+
private LinkedList queue = new LinkedList();
+
private LinkedList inuse = new LinkedList();
+
private Object mutex = new Object();
+
private boolean isOpen = true;
public SenderQueue(PooledSocketSender parent, int limit) {
@@ -203,30 +147,67 @@
this.parent = parent;
}
+ /**
+ * @return Returns the limit.
+ */
+ public int getLimit() {
+ return limit;
+ }
+ /**
+ * @param limit The limit to set.
+ */
+ public void setLimit(int limit) {
+ this.limit = limit;
+ }
+ /**
+ * @return
+ */
+ public int getInUsePoolSize() {
+ return inuse.size();
+ }
+
+ /**
+ * @return
+ */
+ public int getInPoolSize() {
+ return queue.size();
+ }
+
public SocketSender getSender(long timeout) {
SocketSender sender = null;
long start = System.currentTimeMillis();
long delta = 0;
do {
synchronized (mutex) {
- if ( !isOpen ) throw new IllegalStateException("Socket pool is closed.");
- if ( queue.size() > 0 ) {
+ if (!isOpen)
+ throw new IllegalStateException(
+ "Socket pool is closed.");
+ if (queue.size() > 0) {
sender = (SocketSender) queue.removeFirst();
- } else if ( inuse.size() < limit ) {
+ } else if (inuse.size() < limit) {
sender = getNewSocketSender();
} else {
try {
mutex.wait(timeout);
- }catch ( Exception x ) {
- PooledSocketSender.log.warn("PoolSocketSender.senderQueue.getSender failed",x);
+ } catch (Exception x) {
+ PooledSocketSender.log
+ .warn(
+ sm
+ .getString(
+ "PoolSocketSender.senderQueue.sender.failed",
+ parent.getAddress(),
+ new Integer(parent
+ .getPort())),
+ x);
}//catch
}//end if
- if ( sender != null ) {
+ if (sender != null) {
inuse.add(sender);
}
}//synchronized
delta = System.currentTimeMillis() - start;
- } while ( (isOpen) && (sender == null) && (timeout==0?true:(delta<timeout)) );
+ } while ((isOpen) && (sender == null)
+ && (timeout == 0 ? true : (delta < timeout)));
//to do
return sender;
}
@@ -242,21 +223,24 @@
private SocketSender getNewSocketSender() {
//new SocketSender(
- SocketSender sender = new SocketSender(parent.getAddress(),parent.getPort());
- sender.setKeepAliveMaxRequestCount(parent.getKeepAliveMaxRequestCount());
+ SocketSender sender = new SocketSender(parent.getAddress(), parent
+ .getPort());
+ sender.setKeepAliveMaxRequestCount(parent
+ .getKeepAliveMaxRequestCount());
sender.setKeepAliveTimeout(parent.getKeepAliveTimeout());
sender.setAckTimeout(parent.getAckTimeout());
+ sender.setWaitForAck(parent.isWaitForAck());
return sender;
}
public void close() {
synchronized (mutex) {
- for ( int i=0; i<queue.size(); i++ ) {
- SocketSender sender = (SocketSender)queue.get(i);
+ for (int i = 0; i < queue.size(); i++) {
+ SocketSender sender = (SocketSender) queue.get(i);
sender.disconnect();
}//for
- for ( int i=0; i<inuse.size(); i++ ) {
+ for (int i = 0; i < inuse.size(); i++) {
SocketSender sender = (SocketSender) inuse.get(i);
sender.disconnect();
}//for
@@ -266,7 +250,7 @@
mutex.notifyAll();
}
}
-
+
public void open() {
synchronized (mutex) {
isOpen = true;
@@ -274,4 +258,4 @@
}
}
}
-}
+}
\ No newline at end of file
1.19 +6 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
Index: ReplicationListener.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -r1.18 -r1.19
--- ReplicationListener.java 27 Dec 2004 09:30:36 -0000 1.18
+++ ReplicationListener.java 15 Feb 2005 09:31:45 -0000 1.19
@@ -45,7 +45,7 @@
private int tcpThreadCount;
private long tcpSelectorTimeout;
private int tcpListenPort;
- private boolean isSenderSynchronized;
+ private boolean waitForAck;
private Selector selector = null;
private Object interestOpsMutex = new Object();
@@ -221,7 +221,7 @@
return;
} else {
// invoking this wakes up the worker thread then returns
- worker.serviceChannel(key, isSenderSynchronized);
+ worker.serviceChannel(key, waitForAck);
return;
}
}
@@ -249,11 +249,11 @@
public void setTcpThreadCount(int tcpThreadCount) {
this.tcpThreadCount = tcpThreadCount;
}
- public boolean getIsSenderSynchronized() {
- return isSenderSynchronized;
+ public boolean isWaitForAck() {
+ return waitForAck;
}
- public void setIsSenderSynchronized(boolean isSenderSynchronized) {
- this.isSenderSynchronized = isSenderSynchronized;
+ public void setWaitForAck(boolean waitForAck) {
+ this.waitForAck = waitForAck;
}
public String getHost() {
1.58 +2 -3 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java
Index: SimpleTcpCluster.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v
retrieving revision 1.57
retrieving revision 1.58
diff -u -r1.57 -r1.58
--- SimpleTcpCluster.java 27 Dec 2004 09:30:36 -0000 1.57
+++ SimpleTcpCluster.java 15 Feb 2005 09:31:45 -0000 1.58
@@ -389,8 +389,7 @@
}
registerMBeans();
- clusterReceiver.setIsSenderSynchronized(clusterSender
- .getIsSenderSynchronized());
+ clusterReceiver.setWaitForAck(clusterSender.isWaitForAck());
clusterReceiver.setCatalinaCluster(this);
clusterReceiver.start();
clusterSender.setCatalinaCluster(this);
1.15 +18 -187 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java
Index: SocketSender.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -r1.14 -r1.15
--- SocketSender.java 27 Dec 2004 09:30:36 -0000 1.14
+++ SocketSender.java 15 Feb 2005 09:31:45 -0000 1.15
@@ -15,176 +15,42 @@
*/
package org.apache.catalina.cluster.tcp;
-import java.net.InetAddress ;
-import java.net.Socket;
+
+import java.net.InetAddress;
/**
* Send cluster messages sync to request with only one socket.
*
* @author Filip Hanik
* @author Peter Rossbach
- * @version 1.1
+ * @version 1.2
*/
-public class SocketSender implements IDataSender
-{
-
- private static org.apache.commons.logging.Log log =
- org.apache.commons.logging.LogFactory.getLog( SocketSender.class );
-
- private InetAddress address;
- private int port;
- private Socket sc = null;
- private boolean isSocketConnected = false;
- /**
- * Flag socket as suspect
- */
- private boolean suspect;
- /**
- * 15 seconds socket read timeout (for acknowledgement)
- */
- private long ackTimeout = 15*1000;
- /**
- * keep socket open for no more than one min
- */
- private long keepAliveTimeout = 60*1000;
-
- /**
- * max 100 requests before reconnecting
- */
- private int keepAliveMaxRequestCount = 100;
-
- private long keepAliveConnectTime = 0;
- private int keepAliveCount = 0;
-
- private long nrOfRequests = 0;
-
- private long totalBytes = 0;
-
- private synchronized void addStats(int length) {
- nrOfRequests++;
- totalBytes += length;
- if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
- log.debug("Send stats from " + getAddress().getHostAddress() + ":" + getPort()
- + "Nr of bytes sent=" + totalBytes + " over "
- + nrOfRequests + " ==" + (totalBytes / nrOfRequests)
- + " bytes/request");
- }
-
- }
-
- /**
- * get number of messages that send
- * @return Returns the nrOfRequests.
- */
- public long getNrOfRequests() {
- return nrOfRequests;
- }
+public class SocketSender extends DataSender {
+ // ----------------------------------------------------- Instance Variables
/**
- * get total num bytes send with this socket.
- * @return Returns the totalBytes.
+ * The descriptive information about this implementation.
*/
- public long getTotalBytes() {
- return totalBytes;
- }
-
- public SocketSender(InetAddress host, int port)
- {
- this.address = host;
- this.port = port;
- }
-
- public InetAddress getAddress()
- {
- return address;
- }
-
- public int getPort()
- {
- return port;
- }
-
- public void connect() throws java.io.IOException
- {
- sc = new Socket(getAddress(),getPort());
- sc.setSoTimeout((int)ackTimeout);
- isSocketConnected = true;
- this.keepAliveCount = 0;
- this.keepAliveConnectTime = System.currentTimeMillis();
- }
-
- public void disconnect()
- {
- try
- {
- sc.close();
- }catch ( Exception x)
- {}
- isSocketConnected = false;
- }
+ private static final String info = "SocketSender/1.2";
- public boolean isConnected()
- {
- return isSocketConnected;
- }
-
- public void checkIfDisconnect() {
- long ctime = System.currentTimeMillis() - this.keepAliveConnectTime;
- if ( (ctime > this.keepAliveTimeout) ||
- (this.keepAliveCount >= this.keepAliveMaxRequestCount) ) {
- disconnect();
- }
- }
+ // ------------------------------------------------------------- Constructor
- public void setAckTimeout(long timeout) {
- this.ackTimeout = timeout;
+ public SocketSender(InetAddress host, int port) {
+ super(host, port);
}
- public long getAckTimeout() {
- return ackTimeout;
- }
+ // ------------------------------------------------------------- Properties
/**
- * send with only one socket at a time
- * @param sessionid unique message id
- * @param data data to send
- * @throws java.io.IOException
+ * Return descriptive information about this implementation and the
+ * corresponding version number, in the format
+ * <code><description>/<version></code>.
*/
- public synchronized void sendMessage(String sessionId, byte[] data) throws java.io.IOException
- {
- checkIfDisconnect();
- if ( !isConnected() ) connect();
- try
- {
- sc.getOutputStream().write(data);
- sc.getOutputStream().flush();
- waitForAck(ackTimeout);
- }
- catch ( java.io.IOException x )
- {
- disconnect();
- connect();
- sc.getOutputStream().write(data);
- sc.getOutputStream().flush();
- waitForAck(ackTimeout);
- }
- this.keepAliveCount++;
- checkIfDisconnect();
- addStats(data.length);
- }
+ public String getInfo() {
+
+ return (info);
- private void waitForAck(long timeout) throws java.io.IOException {
- try {
- int i = sc.getInputStream().read();
- while ( (i != -1) && (i != 3)) {
- i = sc.getInputStream().read();
- }
- } catch (java.net.SocketTimeoutException x ) {
- log.warn("Wasn't able to read acknowledgement from server["+getAddress()+":"+getPort()+"] in "+this.ackTimeout+" ms."+
- " Disconnecting socket, and trying again.");
- throw x;
- }
}
public String toString() {
@@ -192,40 +58,5 @@
buf.append(getAddress()).append(":").append(getPort()).append("]");
return buf.toString();
}
- public boolean isSuspect() {
- return suspect;
- }
-
- public boolean getSuspect() {
- return suspect;
- }
- public void setSuspect(boolean suspect) {
- this.suspect = suspect;
- }
- public long getKeepAliveTimeout() {
- return keepAliveTimeout;
- }
- public void setKeepAliveTimeout(long keepAliveTimeout) {
- this.keepAliveTimeout = keepAliveTimeout;
- }
- public int getKeepAliveMaxRequestCount() {
- return keepAliveMaxRequestCount;
- }
- public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) {
- this.keepAliveMaxRequestCount = keepAliveMaxRequestCount;
- }
-
- /**
- * @return Returns the keepAliveConnectTime.
- */
- public long getKeepAliveConnectTime() {
- return keepAliveConnectTime;
- }
- /**
- * @return Returns the keepAliveCount.
- */
- public int getKeepAliveCount() {
- return keepAliveCount;
- }
-}
+}
\ No newline at end of file
1.13 +5 -4 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java
Index: TcpReplicationThread.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- TcpReplicationThread.java 13 Jul 2004 09:43:58 -0000 1.12
+++ TcpReplicationThread.java 15 Feb 2005 09:31:45 -0000 1.13
@@ -39,7 +39,7 @@
org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class );
private ByteBuffer buffer = ByteBuffer.allocate (1024);
private SelectionKey key;
- private boolean synchronous=false;
+ private boolean waitForAck=true;
TcpReplicationThread ()
{
@@ -91,10 +91,10 @@
* to ignore read-readiness for this channel while the
* worker thread is servicing it.
*/
- synchronized void serviceChannel (SelectionKey key, boolean synchronous)
+ synchronized void serviceChannel (SelectionKey key, boolean waitForAck)
{
this.key = key;
- this.synchronous=synchronous;
+ this.waitForAck=waitForAck;
key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
this.notify(); // awaken the thread
@@ -125,11 +125,12 @@
//check to see if any data is available
int pkgcnt = reader.execute();
while ( pkgcnt > 0 ) {
- if (synchronous) {
+ if (waitForAck) {
sendAck(key,channel);
} //end if
pkgcnt--;
}
+
if (count < 0) {
// close channel on EOF, invalidates the key
channel.close();
1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
Index: DataSender.java
===================================================================
/*
* Copyright 1999,2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.catalina.cluster.tcp;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import org.apache.catalina.util.StringManager;
/**
* Send cluster messages with only one socket. Ack and keep Alive Handling is
* supported
*
* @author Peter Rossbach
* @author Filip Hanik
* @version 1.2
*/
public class DataSender implements IDataSender {
private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
.getLog(DataSender.class);
/**
* The string manager for this package.
*/
protected static StringManager sm = StringManager
.getManager(Constants.Package);
// ----------------------------------------------------- Instance Variables
/**
* The descriptive information about this implementation.
*/
private static final String info = "DataSender/1.2";
private InetAddress address;
private int port;
private Socket sc = null;
private boolean isSocketConnected = false;
private boolean suspect;
private long ackTimeout;
protected long nrOfRequests = 0;
protected long totalBytes = 0;
protected long connectCounter = 0;
protected long disconnectCounter = 0;
protected long missingAckCounter = 0;
protected long dataResendCounter = 0;
/**
* keep socket open for no more than one min
*/
private long keepAliveTimeout = 60 * 1000;
/**
* max 100 requests before reconnecting
*/
private int keepAliveMaxRequestCount = 100;
/**
* Last connect timestamp
*/
private long keepAliveConnectTime = 0;
/**
* keepalive counter
*/
private int keepAliveCount = 0;
private boolean waitForAck = true;
private int socketCloseCounter;
private int socketOpenCounter;
// ------------------------------------------------------------- Constructor
public DataSender(InetAddress host, int port) {
this.address = host;
this.port = port;
if (log.isInfoEnabled())
log.info(sm.getString("IDataSender.create", address, new Integer(
port)));
}
// ------------------------------------------------------------- Properties
/**
* Return descriptive information about this implementation and the
* corresponding version number, in the format
* <code><description>/<version></code>.
*/
public String getInfo() {
return (info);
}
/**
* @return Returns the nrOfRequests.
*/
public long getNrOfRequests() {
return nrOfRequests;
}
/**
* @return Returns the totalBytes.
*/
public long getTotalBytes() {
return totalBytes;
}
/**
* @return Returns the connectCounter.
*/
public long getConnectCounter() {
return connectCounter;
}
/**
* @return Returns the disconnectCounter.
*/
public long getDisconnectCounter() {
return disconnectCounter;
}
/**
* @return Returns the missingAckCounter.
*/
public long getMissingAckCounter() {
return missingAckCounter;
}
/**
* @return Returns the socketOpenCounter.
*/
public int getSocketOpenCounter() {
return socketOpenCounter;
}
/**
* @return Returns the socketCloseCounter.
*/
public int getSocketCloseCounter() {
return socketCloseCounter;
}
/**
* @return Returns the dataResendCounter.
*/
public long getDataResendCounter() {
return dataResendCounter;
}
public InetAddress getAddress() {
return address;
}
public int getPort() {
return port;
}
public boolean isConnected() {
return isSocketConnected;
}
/**
* @param isSocketConnected
* The isSocketConnected to set.
*/
protected void setSocketConnected(boolean isSocketConnected) {
this.isSocketConnected = isSocketConnected;
}
public boolean isSuspect() {
return suspect;
}
public boolean getSuspect() {
return suspect;
}
public void setSuspect(boolean suspect) {
this.suspect = suspect;
}
public long getAckTimeout() {
return ackTimeout;
}
public void setAckTimeout(long ackTimeout) {
this.ackTimeout = ackTimeout;
}
public long getKeepAliveTimeout() {
return keepAliveTimeout;
}
public void setKeepAliveTimeout(long keepAliveTimeout) {
this.keepAliveTimeout = keepAliveTimeout;
}
public int getKeepAliveMaxRequestCount() {
return keepAliveMaxRequestCount;
}
public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) {
this.keepAliveMaxRequestCount = keepAliveMaxRequestCount;
}
/**
* @return Returns the keepAliveConnectTime.
*/
public long getKeepAliveConnectTime() {
return keepAliveConnectTime;
}
/**
* @return Returns the keepAliveCount.
*/
public int getKeepAliveCount() {
return keepAliveCount;
}
/**
* @return Returns the waitForAck.
*/
public boolean isWaitForAck() {
return waitForAck;
}
/**
* @param waitForAck
* The waitForAck to set.
*/
public void setWaitForAck(boolean waitForAck) {
this.waitForAck = waitForAck;
}
// --------------------------------------------------------- Public Methods
public void connect() throws java.io.IOException {
connectCounter++;
if (log.isDebugEnabled())
log.debug(sm.getString("IDataSender.connect", address,
new Integer(port)));
openSocket();
}
/**
* close socket
*
* @see org.apache.catalina.cluster.tcp.IDataSender#disconnect()
* @see DataSender#closeSocket()
*/
public void disconnect() {
disconnectCounter++;
if (log.isDebugEnabled())
log.debug(sm.getString("IDataSender.disconnect", address,
new Integer(port)));
closeSocket();
}
/**
* Check, if time to close socket! Important for AsyncSocketSender that
* replication thread is not fork again! <b>Only work when keepAliveTimeout
* or keepAliveMaxRequestCount greater -1 </b>
* @return true, is socket close
* @see DataSender#closeSocket()
*/
public boolean checkIfCloseSocket() {
boolean isCloseSocket = true ;
long ctime = System.currentTimeMillis() - this.keepAliveConnectTime;
if ((keepAliveTimeout > -1 && ctime > this.keepAliveTimeout)
|| (keepAliveMaxRequestCount > -1 && this.keepAliveCount >= this.keepAliveMaxRequestCount)) {
closeSocket();
} else
isCloseSocket = false ;
return isCloseSocket;
}
/*
* Send message
*
* @see org.apache.catalina.cluster.tcp.IDataSender#sendMessage(java.lang.String,
* byte[])
*/
public synchronized void sendMessage(String messageid, byte[] data)
throws java.io.IOException {
pushMessage(messageid, data);
}
/*
* Reset sender statistics
*/
public synchronized void resetStatistics() {
nrOfRequests = 0;
totalBytes = 0;
disconnectCounter = 0;
connectCounter = isConnected() ? 1 : 0;
missingAckCounter = 0;
dataResendCounter = 0;
socketOpenCounter =isConnected() ? 1 : 0;
socketCloseCounter = 0;
}
/**
* Name of this SockerSender
*/
public String toString() {
StringBuffer buf = new StringBuffer("DataSender[");
buf.append(getAddress()).append(":").append(getPort()).append("]");
return buf.toString();
}
// --------------------------------------------------------- Protected
// Methods
/**
* @throws IOException
* @throws SocketException
*/
protected void openSocket() throws IOException, SocketException {
socketOpenCounter++;
if (log.isDebugEnabled())
log.debug(sm.getString("IDataSender.openSocket", address, new Integer(
port)));
sc = new Socket(getAddress(), getPort());
if (isWaitForAck())
sc.setSoTimeout((int) ackTimeout);
isSocketConnected = true;
this.keepAliveCount = 0;
this.keepAliveConnectTime = System.currentTimeMillis();
}
/**
* close socket
*
* @see DataSender#disconnect()
* @see DataSender#checkIfCloseSocket()
*/
protected void closeSocket() {
if(isSocketConnected) {
socketCloseCounter++;
if (log.isDebugEnabled())
log.debug(sm.getString("IDataSender.socketclose",
address, new Integer(port)));
try {
sc.close();
} catch (Exception x) {
}
isSocketConnected = false;
}
}
/**
* Add statistic for this socket instance
*
* @param length
*/
protected void addStats(int length) {
nrOfRequests++;
totalBytes += length;
if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
log.debug(sm.getString("IDataSender.stats", new Object[] {
getAddress().getHostAddress(), new Integer(getPort()),
new Long(totalBytes), new Long(nrOfRequests),
new Long(totalBytes / nrOfRequests) }));
}
}
/**
* push messages with only one socket at a time
*
* @param messageid
* unique message id
* @param data
* data to send
* @throws java.io.IOException
*/
protected synchronized void pushMessage(String messageid, byte[] data)
throws java.io.IOException {
checkIfCloseSocket();
if (!isConnected())
openSocket();
try {
sc.getOutputStream().write(data);
sc.getOutputStream().flush();
if (isWaitForAck())
waitForAck(ackTimeout);
} catch (java.io.IOException x) {
// second try with fresh connection
dataResendCounter++;
if (log.isTraceEnabled())
log.trace(sm.getString("IDataSender.send.again", address,
new Integer(port)));
closeSocket();
openSocket();
sc.getOutputStream().write(data);
sc.getOutputStream().flush();
if (isWaitForAck())
waitForAck(ackTimeout);
}
this.keepAliveCount++;
checkIfCloseSocket();
addStats(data.length);
if (log.isTraceEnabled())
log.trace(sm.getString("IDataSender.send.message", address,
new Integer(port), messageid, new Long(data.length)));
}
/**
* Wait for Acknowledgement from other server
*
* @param timeout
* @throws java.io.IOException
*/
protected void waitForAck(long timeout) throws java.io.IOException {
try {
int i = sc.getInputStream().read();
while ((i != -1) && (i != 3)) {
i = sc.getInputStream().read();
}
} catch (java.net.SocketTimeoutException x) {
missingAckCounter++;
log.warn(sm.getString("IDataSender.missing.ack", getAddress(),
new Integer(getPort()), new Long(this.ackTimeout)));
throw x;
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org
Re: cvs commit:
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp DataSender.java
AsyncSocketSender.java IDataSender.java PooledSocketSender.java
ReplicationListener.java SimpleTcpCluster.java SocketSender.java TcpReplicatio
Posted by shubham <pt...@sancharnet.in>.
Dengerous virus is entering to your computer through this message do not
reply!
----- Original Message -----
From: <pe...@apache.org>
To: <ja...@apache.org>
Sent: Tuesday, February 15, 2005 3:01 PM
Subject: cvs commit:
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluste
r/tcp DataSender.java AsyncSocketSender.java IDataSender.java
PooledSocketSender.java ReplicationListener.java SimpleTcpCluster.java
SocketSender.java TcpReplicationThread.java
---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org