You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by pe...@apache.org on 2005/03/25 23:10:25 UTC
cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp ReplicationListener.java
pero 2005/03/25 14:10:25
Modified: modules/cluster/src/share/org/apache/catalina/cluster/tcp
ReplicationListener.java
Log:
Change attribute name waitForAck to sendAck
Add compress/uncompress message data transfer
Update some documentation
Revision Changes Path
1.20 +74 -24 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
Index: ReplicationListener.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java,v
retrieving revision 1.19
retrieving revision 1.20
diff -u -r1.19 -r1.20
--- ReplicationListener.java 15 Feb 2005 09:31:45 -0000 1.19
+++ ReplicationListener.java 25 Mar 2005 22:10:25 -0000 1.20
@@ -17,26 +17,44 @@
package org.apache.catalina.cluster.tcp;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.nio.channels.Selector;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SelectableChannel;
-
-import java.net.ServerSocket;
-import java.net.InetSocketAddress;
import java.util.Iterator;
-import org.apache.catalina.cluster.io.ListenCallback;
-import org.apache.catalina.cluster.io.ObjectReader;
+
import org.apache.catalina.cluster.CatalinaCluster;
import org.apache.catalina.cluster.ClusterReceiver;
+import org.apache.catalina.cluster.tcp.Constants;
+import org.apache.catalina.cluster.io.ListenCallback;
+import org.apache.catalina.cluster.io.ObjectReader;
+import org.apache.catalina.util.StringManager;
/**
- */
+* FIXME i18n log messages
+* FIXME jmx support
+* @author Peter Rossbach
+* @author Filip Hanik
+* @version $Revision$ $Date$
+*/
public class ReplicationListener implements Runnable,ClusterReceiver
{
-
private static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog( ReplicationListener.class );
+
+ /**
+ * The descriptive information about this implementation.
+ */
+ private static final String info = "ReplicationListener/1.1";
+
+ /**
+ * The string manager for this package.
+ */
+ protected StringManager sm = StringManager.getManager(Constants.Package);
+
+
private ThreadPool pool = null;
private boolean doListen = false;
private ListenCallback callback;
@@ -45,7 +63,12 @@
private int tcpThreadCount;
private long tcpSelectorTimeout;
private int tcpListenPort;
- private boolean waitForAck;
+ private boolean sendAck;
+ /**
+ * Compress message data bytes
+ */
+ private boolean compress = true ;
+
private Selector selector = null;
private Object interestOpsMutex = new Object();
@@ -53,6 +76,24 @@
public ReplicationListener() {
}
+ /**
+ * @return Returns the compress.
+ */
+ public boolean isCompress() {
+ return compress;
+ }
+
+ /**
+ * @param compress The compress to set.
+ */
+ public void setCompress(boolean compressMessageData) {
+ this.compress = compressMessageData;
+ }
+
+ /**
+ * start cluster receiver
+ * @see org.apache.catalina.cluster.ClusterReceiver#start()
+ */
public void start() {
try {
pool = new ThreadPool(tcpThreadCount, TcpReplicationThread.class, interestOpsMutex);
@@ -89,6 +130,11 @@
}
}
+ /**
+ * get data from channel and store in byte array
+ * send it to cluster
+ * @throws Exception
+ */
public void listen ()
throws Exception
{
@@ -134,11 +180,12 @@
ServerSocketChannel server =
(ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
+ Object attach = attach = new ObjectReader(channel, selector,
+ callback,isCompress()) ;
registerChannel(selector,
channel,
SelectionKey.OP_READ,
- new ObjectReader(channel, selector,
- callback));
+ attach);
}
// is there data to read on this channel?
if (key.isReadable()) {
@@ -159,7 +206,7 @@
log.error("Unable to process request in ReplicationListener", x);
}
- } //while
+ }
serverChannel.close();
selector.close();
}
@@ -180,7 +227,10 @@
callback = cluster;
}
-
+ public CatalinaCluster getCatalinaCluster() {
+ return (CatalinaCluster)callback ;
+ }
+
// ----------------------------------------------------------
/**
@@ -216,13 +266,13 @@
if (worker == null) {
// No threads available, do nothing, the selection
// loop will keep calling this method until a
- // thread becomes available. This design could
- // be improved.
- return;
+ // thread becomes available.
+ // FIXME: This design could be improved.
+ if(log.isDebugEnabled())
+ log.debug("No TcpReplicationThread available");
} else {
// invoking this wakes up the worker thread then returns
- worker.serviceChannel(key, waitForAck);
- return;
+ worker.serviceChannel(key, sendAck);
}
}
public String getTcpListenAddress() {
@@ -249,11 +299,11 @@
public void setTcpThreadCount(int tcpThreadCount) {
this.tcpThreadCount = tcpThreadCount;
}
- public boolean isWaitForAck() {
- return waitForAck;
+ public boolean isSendAck() {
+ return sendAck;
}
- public void setWaitForAck(boolean waitForAck) {
- this.waitForAck = waitForAck;
+ public void setSendAck(boolean sendAck) {
+ this.sendAck = sendAck;
}
public String getHost() {
---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org