You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2003/12/15 22:33:06 UTC
cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp ReplicationListener.java SimpleTcpCluster.java SocketSender.java TcpReplicationThread.java
fhanik 2003/12/15 13:33:06
Modified: modules/cluster/src/share/org/apache/catalina/cluster/session
SimpleTcpReplicationManager.java
modules/cluster/src/share/org/apache/catalina/cluster/tcp
ReplicationListener.java SimpleTcpCluster.java
SocketSender.java TcpReplicationThread.java
Log:
Implemented true synchronous replication. The system now awaits for an ack from the other server before returning the
requested thread during replication. This is bug 25493
Revision Changes Path
1.17 +4 -4 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java
Index: SimpleTcpReplicationManager.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -r1.16 -r1.17
--- SimpleTcpReplicationManager.java 15 Nov 2003 01:07:23 -0000 1.16
+++ SimpleTcpReplicationManager.java 15 Dec 2003 21:33:06 -0000 1.17
@@ -491,7 +491,7 @@
new SessionMessage(this.getName(),
SessionMessage.EVT_GET_ALL_SESSIONS,
null,
- null);
+ "GET-ALL");
cluster.send(msg, mbr);
log.warn("Manager["+getName()+"], requesting session state from "+mbr+
". This operation will timeout if no session state has been received within "+
1.5 +9 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
Index: ReplicationListener.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- ReplicationListener.java 18 Apr 2003 02:51:24 -0000 1.4
+++ ReplicationListener.java 15 Dec 2003 21:33:06 -0000 1.5
@@ -91,13 +91,16 @@
private java.net.InetAddress bind;
private int port;
private long timeout = 0;
+ private boolean synchronous = false;
public ReplicationListener(ListenCallback callback,
int poolSize,
java.net.InetAddress bind,
int port,
- long timeout)
+ long timeout,
+ boolean synchronous)
{
try {
+ this.synchronous=synchronous;
pool = new ThreadPool(poolSize, TcpReplicationThread.class);
}catch ( Exception x ) {
log.fatal("Unable to start thread pool for TCP listeners, session replication will fail! msg="+x.getMessage());
@@ -155,7 +158,7 @@
SocketChannel channel = server.accept();
registerChannel (selector,
channel,
- SelectionKey.OP_READ,
+ SelectionKey.OP_READ | SelectionKey.OP_WRITE,
new ObjectReader(channel,selector,callback));
}
// is there data to read on this channel?
@@ -219,6 +222,6 @@
return;
}
// invoking this wakes up the worker thread then returns
- worker.serviceChannel (key);
+ worker.serviceChannel (key,synchronous);
}
}
1.20 +6 -5 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java
Index: SimpleTcpCluster.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v
retrieving revision 1.19
retrieving revision 1.20
diff -u -r1.19 -r1.20
--- SimpleTcpCluster.java 16 Nov 2003 22:22:45 -0000 1.19
+++ SimpleTcpCluster.java 15 Dec 2003 21:33:06 -0000 1.20
@@ -480,7 +480,8 @@
this.tcpThreadCount,
this.tcpAddress,
this.tcpPort,
- this.tcpSelectorTimeout);
+ this.tcpSelectorTimeout,
+ "synchronous".equals(this.replicationMode));
mReplicationListener.setDaemon(true);
mReplicationListener.start();
mReplicationTransmitter = new ReplicationTransmitter(new IDataSender[0]);
1.4 +21 -7 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java
Index: SocketSender.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SocketSender.java 15 Oct 2003 03:21:04 -0000 1.3
+++ SocketSender.java 15 Dec 2003 21:33:06 -0000 1.4
@@ -82,6 +82,7 @@
private Socket sc = null;
private boolean isSocketConnected = false;
private boolean suspect;
+ private long ackTimeout = 10000;
public SocketSender(InetAddress host, int port)
{
@@ -102,6 +103,7 @@
public void connect() throws java.io.IOException
{
sc = new Socket(getAddress(),getPort());
+ sc.setSoTimeout((int)ackTimeout);
isSocketConnected = true;
}
@@ -131,15 +133,27 @@
try
{
sc.getOutputStream().write(data);
+ sc.getOutputStream().flush();
+ waitForAck(ackTimeout);
}
catch ( java.io.IOException x )
{
disconnect();
connect();
sc.getOutputStream().write(data);
+ sc.getOutputStream().flush();
+ waitForAck(ackTimeout);
}
}
-
+
+ private void waitForAck(long timeout) throws java.io.IOException,
+ java.net.SocketTimeoutException {
+ int i = sc.getInputStream().read();
+ while ( (i!=-1) && (i!=3) ) {
+ i = sc.getInputStream().read();
+ }
+ }
+
public String toString() {
StringBuffer buf = new StringBuffer("SocketSender[");
buf.append(getAddress()).append(":").append(getPort()).append("]");
@@ -148,14 +162,14 @@
public boolean isSuspect() {
return suspect;
}
-
+
public boolean getSuspect() {
return suspect;
}
-
+
public void setSuspect(boolean suspect) {
this.suspect = suspect;
}
-}
\ No newline at end of file
+}
1.2 +27 -9 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java
Index: TcpReplicationThread.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- TcpReplicationThread.java 18 Apr 2003 02:51:24 -0000 1.1
+++ TcpReplicationThread.java 15 Dec 2003 21:33:06 -0000 1.2
@@ -88,7 +88,7 @@
org.apache.commons.logging.LogFactory.getLog( SimpleTcpCluster.class );
private ByteBuffer buffer = ByteBuffer.allocate (1024);
private SelectionKey key;
-
+ private boolean synchronous=false;
TcpReplicationThread ()
{
@@ -112,8 +112,9 @@
try {
drainChannel (key);
} catch (Exception e) {
- log.info ("TCP Worker thread in cluster caught '"
- + e + "' closing channel");
+ log.error ("TCP Worker thread in cluster caught '"
+ + e + "' closing channel", e);
+
// close channel and nudge selector
try {
key.channel().close();
@@ -139,9 +140,10 @@
* to ignore read-readiness for this channel while the
* worker thread is servicing it.
*/
- synchronized void serviceChannel (SelectionKey key)
+ synchronized void serviceChannel (SelectionKey key, boolean synchronous)
{
this.key = key;
+ this.synchronous=synchronous;
key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
this.notify(); // awaken the thread
}
@@ -157,6 +159,7 @@
void drainChannel (SelectionKey key)
throws Exception
{
+ boolean packetReceived=false;
SocketChannel channel = (SocketChannel) key.channel();
int count;
buffer.clear(); // make buffer empty
@@ -164,11 +167,19 @@
// loop while data available, channel is non-blocking
while ((count = channel.read (buffer)) > 0) {
buffer.flip(); // make buffer readable
- reader.append(buffer.array(),0,count);
+ if (reader.append(buffer.array(),0,count)) {
+ if (synchronous) {
+ sendAck(key,channel);
+ } //end if
+ }
buffer.clear(); // make buffer empty
}
//check to see if any data is available
- reader.execute();
+ if ( reader.execute() ) {
+ if (synchronous) {
+ sendAck(key,channel);
+ }//end if
+ }//end if
if (count < 0) {
// close channel on EOF, invalidates the key
channel.close();
@@ -178,5 +189,12 @@
key.interestOps (key.interestOps() | SelectionKey.OP_READ);
// cycle the selector so this key is active again
key.selector().wakeup();
+ }
+
+ private void sendAck(SelectionKey key, SocketChannel channel) throws java.io.IOException {
+ //send a reply-acknowledgement
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(new byte[] {6,2,3});
+ channel.write(buf);
+ buf.clear();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org