You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2003/04/18 04:51:24 UTC
cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util SmartQueue.java
fhanik 2003/04/17 19:51:24
Modified: modules/cluster/src/share/org/apache/catalina/cluster/session
SimpleTcpReplicationManager.java
modules/cluster/src/share/org/apache/catalina/cluster/tcp
IDataSender.java ReplicationListener.java
ReplicationTransmitter.java SimpleTcpCluster.java
SocketSender.java ThreadPool.java WorkerThread.java
Added: modules/cluster/src/share/org/apache/catalina/cluster/tcp
AsyncSocketSender.java IDataSenderFactory.java
TcpReplicationThread.java
modules/cluster/src/share/org/apache/catalina/cluster/util
SmartQueue.java
Log:
added in support for asynchronous replication
Revision Changes Path
1.8 +3 -15 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java
Index: SimpleTcpReplicationManager.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- SimpleTcpReplicationManager.java 16 Apr 2003 18:49:22 -0000 1.7
+++ SimpleTcpReplicationManager.java 18 Apr 2003 02:51:24 -0000 1.8
@@ -554,18 +554,6 @@
}
}
- public IDataSender createDataSender(Member addr) {
- try {
- Member mbr = addr;
- return new SocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort());
- } catch ( Exception x ){
- log("Unable to create a socket for replication.",x);
- }
- return null;
-
- }
-
-
public void messageDataReceived(SessionMessage msg) {
try {
messageReceived(msg, msg.getAddress()!=null?(Member)msg.getAddress():null);
1.2 +4 -4 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSender.java
Index: IDataSender.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSender.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- IDataSender.java 19 Feb 2003 20:32:11 -0000 1.1
+++ IDataSender.java 18 Apr 2003 02:51:24 -0000 1.2
@@ -78,6 +78,6 @@
public int getPort();
public void connect() throws java.io.IOException;
public void disconnect();
- public void sendMessage(byte[] data) throws java.io.IOException;
+ public void sendMessage(String sessionId, byte[] data) throws java.io.IOException;
public boolean isConnected();
}
1.4 +9 -5 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
Index: ReplicationListener.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- ReplicationListener.java 3 Apr 2003 02:29:38 -0000 1.3
+++ ReplicationListener.java 18 Apr 2003 02:51:24 -0000 1.4
@@ -97,7 +97,11 @@
int port,
long timeout)
{
- pool = new ThreadPool(poolSize);
+ try {
+ pool = new ThreadPool(poolSize, TcpReplicationThread.class);
+ }catch ( Exception x ) {
+ log.fatal("Unable to start thread pool for TCP listeners, session replication will fail! msg="+x.getMessage());
+ }
this.callback = callback;
this.bind = bind;
this.port = port;
@@ -206,7 +210,7 @@
protected void readDataFromSocket (SelectionKey key)
throws Exception
{
- WorkerThread worker = pool.getWorker();
+ TcpReplicationThread worker = (TcpReplicationThread)pool.getWorker();
if (worker == null) {
// No threads available, do nothing, the selection
// loop will keep calling this method until a
1.5 +21 -21 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
Index: ReplicationTransmitter.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- ReplicationTransmitter.java 26 Mar 2003 17:44:19 -0000 1.4
+++ ReplicationTransmitter.java 18 Apr 2003 02:51:24 -0000 1.5
@@ -63,14 +63,6 @@
package org.apache.catalina.cluster.tcp;
-/**
- * <p>Title: </p>
- * <p>Description: </p>
- * <p>Copyright: Copyright (c) 2002</p>
- * <p>Company: </p>
- * @author not attributable
- * @version 1.0
- */
import org.apache.catalina.cluster.io.XByteBuffer;
@@ -131,18 +123,29 @@
IDataSender[] result = new IDataSender[v.size()];
return result;
}
+
+ protected void sendMessageData(String sessionId, byte[] data, IDataSender sender) throws java.io.IOException {
+ if ( sender == null ) throw new java.io.IOException("Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
+ try
+ {
+ if (!sender.isConnected())
+ sender.connect();
+ sender.sendMessage(sessionId,data);
+ }catch ( Exception x)
+ {
+ log.warn("Unable to send replicated message, is server down?",x);
+ }
- public void sendMessage(byte[] indata, java.net.InetAddress addr, int port) throws java.io.IOException
+ }
+ public void sendMessage(String sessionId, byte[] indata, java.net.InetAddress addr, int port) throws java.io.IOException
{
byte[] data = XByteBuffer.createDataPackage(indata);
String key = addr.getHostAddress()+":"+port;
IDataSender sender = (IDataSender)map.get(key);
- if ( sender == null ) throw new java.io.IOException("Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
- if ( !sender.isConnected() ) sender.connect();
- sender.sendMessage(data);
+ sendMessageData(sessionId,data,sender);
}
- public void sendMessage(byte[] indata) throws java.io.IOException
+ public void sendMessage(String sessionId, byte[] indata) throws java.io.IOException
{
java.util.Iterator i = map.entrySet().iterator();
java.util.Vector v = new java.util.Vector();
@@ -152,14 +155,11 @@
IDataSender sender = (IDataSender)((java.util.Map.Entry)i.next()).getValue();
try
{
- if (!sender.isConnected())
- sender.connect();
- sender.sendMessage(data);
+ sendMessageData(sessionId,data,sender);
}catch ( Exception x)
{
- log.warn("Unable to send replicated message, is server down?",x);
+ log.warn("Unable to send replicated message to "+sender+", is server down?",x);
}
-
}//while
}
1.13 +25 -19 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java
Index: SimpleTcpCluster.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- SimpleTcpCluster.java 16 Apr 2003 04:06:55 -0000 1.12
+++ SimpleTcpCluster.java 18 Apr 2003 02:51:24 -0000 1.13
@@ -1,13 +1,7 @@
/*
-<<<<<<< SimpleTcpCluster.java
* $Header$
* $Revision$
* $Date$
-=======
- * $Header$
- * $Revision$
- * $Date$
->>>>>>> 1.7
*
* ====================================================================
*
@@ -275,7 +269,12 @@
* The channel configuration.
*/
protected String protocol = null;
-
+
+ /**
+ * The replication mode, can be either synchronous or asynchronous
+ * defaults to synchronous
+ */
+ protected String replicationMode="synchronous";
// ------------------------------------------------------------- Properties
public SimpleTcpCluster() {
@@ -320,7 +319,15 @@
return(this.debug);
}
-
+ public void setReplicationMode(String mode) {
+ if ("synchronous".equals(mode) ||
+ "asynchronous".equals(mode)) {
+ log.debug("Setting replcation mode to "+mode);
+ this.replicationMode = mode;
+ } else
+ throw new IllegalArgumentException("Replication mode must be either synchronous or asynchronous");
+
+ }
/**
* Set the name of the cluster to join, if no cluster with
* this name is present create one.
@@ -492,7 +499,7 @@
this.tcpSelectorTimeout);
mReplicationListener.setDaemon(true);
mReplicationListener.start();
- mReplicationTransmitter = new ReplicationTransmitter(new SocketSender[0]);
+ mReplicationTransmitter = new ReplicationTransmitter(new IDataSender[0]);
mReplicationTransmitter.start();
//wait 5 seconds to establish the view membership
@@ -526,13 +533,14 @@
if(destination != null) {
Member tcpdest = dest;
if ( (tcpdest != null) && (!localMember.equals(tcpdest))) {
- mReplicationTransmitter.sendMessage(data,
+ mReplicationTransmitter.sendMessage(msg.getSessionID(),
+ data,
InetAddress.getByName(tcpdest.getHost()),
tcpdest.getPort());
}//end if
}
else {
- mReplicationTransmitter.sendMessage(data);
+ mReplicationTransmitter.sendMessage(msg.getSessionID(),data);
}
} catch ( Exception x ) {
log.error("Unable to send message through tcp channel",x);
@@ -568,9 +576,7 @@
try {
log.info("Replication member added:" + member);
Member mbr = member;
- mReplicationTransmitter.add(
- new SocketSender(InetAddress.getByName(mbr.getHost()),
- mbr.getPort()));
+ mReplicationTransmitter.add(IDataSenderFactory.getIDataSender(replicationMode,mbr));
} catch ( Exception x ) {
log.error("Unable to connect to replication system.",x);
}
@@ -589,7 +595,7 @@
}
catch ( Exception x )
{
- log.error("Unable remove cluster node from replicaiton system.",x);
+ log.error("Unable remove cluster node from replication system.",x);
}
}
1.2 +10 -12 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java
Index: SocketSender.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SocketSender.java 19 Feb 2003 20:32:11 -0000 1.1
+++ SocketSender.java 18 Apr 2003 02:51:24 -0000 1.2
@@ -63,9 +63,6 @@
package org.apache.catalina.cluster.tcp;
import java.net.InetAddress ;
-import java.nio.channels.SocketChannel;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
import java.net.Socket;
/**
@@ -83,7 +80,6 @@
private InetAddress address;
private int port;
private Socket sc = null;
- protected ByteBuffer dbuf = ByteBuffer.allocateDirect(1024);
private boolean isSocketConnected = false;
public SocketSender(InetAddress host, int port)
@@ -104,12 +100,8 @@
public void connect() throws java.io.IOException
{
- //InetSocketAddress isa = new InetSocketAddress(getAddress(), getPort());
sc = new Socket(getAddress(),getPort());
isSocketConnected = true;
- // Connect
- //sc = SocketChannel.open();
- //sc.connect(isa);
}
public void disconnect()
@@ -132,7 +124,7 @@
* @param data
* @throws java.io.IOException
*/
- public synchronized void sendMessage(byte[] data) throws java.io.IOException
+ public synchronized void sendMessage(String sessionId, byte[] data) throws java.io.IOException
{
if ( !isConnected() ) connect();
try
@@ -145,6 +137,12 @@
connect();
sc.getOutputStream().write(data);
}
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer("SocketSender[");
+ buf.append(getAddress()).append(":").append(getPort()).append("]");
+ return buf.toString();
}
1.2 +7 -7 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ThreadPool.java
Index: ThreadPool.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ThreadPool.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ThreadPool.java 19 Feb 2003 20:32:11 -0000 1.1
+++ ThreadPool.java 18 Apr 2003 02:51:24 -0000 1.2
@@ -84,14 +84,14 @@
List idle = new LinkedList();
- ThreadPool (int poolSize)
- {
+ ThreadPool (int poolSize, Class threadClass) throws Exception {
// fill up the pool with worker threads
for (int i = 0; i < poolSize; i++) {
- WorkerThread thread = new WorkerThread (this);
+ WorkerThread thread = (WorkerThread)threadClass.newInstance();
+ thread.setPool(this);
// set thread name for debugging, start it
- thread.setName ("Tcp Replication Thread[" + (i + 1)+"]");
+ thread.setName (threadClass.getName()+"[" + (i + 1)+"]");
thread.setDaemon(true);
thread.start();
1.3 +10 -116 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/WorkerThread.java
Index: WorkerThread.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/WorkerThread.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- WorkerThread.java 20 Mar 2003 20:46:14 -0000 1.2
+++ WorkerThread.java 18 Apr 2003 02:51:24 -0000 1.3
@@ -62,38 +62,18 @@
*/
package org.apache.catalina.cluster.tcp;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.util.List;
-import java.io.IOException;
-import java.nio.channels.SocketChannel;
-import org.apache.catalina.cluster.io.ObjectReader;
-import java.util.LinkedList;
-/**
- * A worker thread class which can drain channels and echo-back
- * the input. Each instance is constructed with a reference to
- * the owning thread pool object. When started, the thread loops
- * forever waiting to be awakened to service the channel associated
- * with a SelectionKey object.
- * The worker is tasked by calling its serviceChannel() method
- * with a SelectionKey object. The serviceChannel() method stores
- * the key reference in the thread object then calls notify()
- * to wake it up. When the channel has been drained, the worker
- * thread returns itself to its parent pool.
- */
+
+
public class WorkerThread extends Thread
{
private static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog( SimpleTcpCluster.class );
- private ByteBuffer buffer = ByteBuffer.allocate (1024);
- private ThreadPool pool;
- private SelectionKey key;
- private boolean doRun = true;
+ protected ThreadPool pool;
+ protected boolean doRun = true;
- WorkerThread (ThreadPool pool)
- {
- this.pool = pool;
+
+ public void setPool(ThreadPool pool) {
+ this.pool = pool;
}
public synchronized void close()
@@ -101,91 +81,5 @@
doRun = false;
notify();
- }
-
- // loop forever waiting for work to do
- public synchronized void run()
- {
- while (doRun) {
- try {
- // sleep and release object lock
- this.wait();
- } catch (InterruptedException e) {
- log.info("TCP worker thread interrupted in cluster",e);
- // clear interrupt status
- this.interrupted();
- }
- if (key == null) {
- continue; // just in case
- }
- try {
- drainChannel (key);
- } catch (Exception e) {
- log.info ("TCP Worker thread in cluster caught '"
- + e + "' closing channel");
- // close channel and nudge selector
- try {
- key.channel().close();
- } catch (IOException ex) {
- log.error("Unable to close channel.",ex);
- }
- key.selector().wakeup();
- }
- key = null;
- // done, ready for more, return to pool
- this.pool.returnWorker (this);
- }
- }
-
- /**
- * Called to initiate a unit of work by this worker thread
- * on the provided SelectionKey object. This method is
- * synchronized, as is the run() method, so only one key
- * can be serviced at a given time.
- * Before waking the worker thread, and before returning
- * to the main selection loop, this key's interest set is
- * updated to remove OP_READ. This will cause the selector
- * to ignore read-readiness for this channel while the
- * worker thread is servicing it.
- */
- synchronized void serviceChannel (SelectionKey key)
- {
- this.key = key;
- key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
- this.notify(); // awaken the thread
- }
-
- /**
- * The actual code which drains the channel associated with
- * the given key. This method assumes the key has been
- * modified prior to invocation to turn off selection
- * interest in OP_READ. When this method completes it
- * re-enables OP_READ and calls wakeup() on the selector
- * so the selector will resume watching this channel.
- */
- void drainChannel (SelectionKey key)
- throws Exception
- {
- SocketChannel channel = (SocketChannel) key.channel();
- int count;
- buffer.clear(); // make buffer empty
- ObjectReader reader = (ObjectReader)key.attachment();
- // loop while data available, channel is non-blocking
- while ((count = channel.read (buffer)) > 0) {
- buffer.flip(); // make buffer readable
- reader.append(buffer.array(),0,count);
- buffer.clear(); // make buffer empty
- }
- //check to see if any data is available
- reader.execute();
- if (count < 0) {
- // close channel on EOF, invalidates the key
- channel.close();
- return;
- }
- // resume interest in OP_READ
- key.interestOps (key.interestOps() | SelectionKey.OP_READ);
- // cycle the selector so this key is active again
- key.selector().wakeup();
}
}
1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java
Index: AsyncSocketSender.java
===================================================================
/*
* $Header: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java,v 1.1 2003/04/18 02:51:24 fhanik Exp $
* $Revision: 1.1 $
* $Date: 2003/04/18 02:51:24 $
*
* ====================================================================
*
* The Apache Software License, Version 1.1
*
* Copyright (c) 1999 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution, if
* any, must include the following acknowlegement:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowlegement may appear in the software itself,
* if and wherever such third-party acknowlegements normally appear.
*
* 4. The names "The Jakarta Project", "Tomcat", and "Apache Software
* Foundation" must not be used to endorse or promote products derived
* from this software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache"
* nor may "Apache" appear in their names without prior written
* permission of the Apache Group.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
* [Additional notices, if required by prior licensing conditions]
*
*/
package org.apache.catalina.cluster.tcp;
import java.net.InetAddress ;
import java.net.Socket;
import java.io.IOException;
import org.apache.catalina.cluster.util.SmartQueue;
public class AsyncSocketSender implements IDataSender {
private InetAddress address;
private int port;
private Socket sc = null;
private boolean isSocketConnected = false;
private SmartQueue queue = new SmartQueue();
public AsyncSocketSender(InetAddress host, int port) {
this.address = host;
this.port = port;
QueueThread t = new QueueThread(this);
t.setDaemon(true);
t.start();
SimpleTcpCluster.log.info("Started async sender thread for TCP replication.");
}
public InetAddress getAddress() {
return address;
}
public int getPort() {
return port;
}
public void connect() throws java.io.IOException {
sc = new Socket(getAddress(),getPort());
isSocketConnected = true;
}
public void disconnect() {
try
{
sc.close();
}catch ( Exception x)
{}
isSocketConnected = false;
}
public boolean isConnected() {
return isSocketConnected;
}
/**
* Blocking send
* @param data
* @throws java.io.IOException
*/
private synchronized void sendMessage(byte[] data) throws java.io.IOException {
if ( !isConnected() ) connect();
try
{
sc.getOutputStream().write(data);
}
catch ( java.io.IOException x )
{
disconnect();
connect();
sc.getOutputStream().write(data);
}
}
public synchronized void sendMessage(String sessionId, byte[] data) throws java.io.IOException {
SmartQueue.SmartEntry entry = new SmartQueue.SmartEntry(sessionId,data);
queue.add(entry);
}
public String toString() {
StringBuffer buf = new StringBuffer("SocketSender[");
buf.append(getAddress()).append(":").append(getPort()).append("]");
return buf.toString();
}
private class QueueThread extends Thread {
AsyncSocketSender sender;
public QueueThread(AsyncSocketSender sender) {
this.sender = sender;
}
public void run() {
while (true) {
SmartQueue.SmartEntry entry = sender.queue.remove();
if ( entry != null ) {
try {
byte[] data = (byte[]) entry.getValue();
sender.sendMessage(data);
}
catch (Exception x) {
SimpleTcpCluster.log.warn(
"Unable to asynchronously send session w/ id=" +
entry.getKey()+" message will be ignored.");
}
}
}
}
}
}
1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSenderFactory.java
Index: IDataSenderFactory.java
===================================================================
/*
* $Header: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSenderFactory.java,v 1.1 2003/04/18 02:51:24 fhanik Exp $
* $Revision: 1.1 $
* $Date: 2003/04/18 02:51:24 $
*
* ====================================================================
*
* The Apache Software License, Version 1.1
*
* Copyright (c) 1999 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution, if
* any, must include the following acknowlegement:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowlegement may appear in the software itself,
* if and wherever such third-party acknowlegements normally appear.
*
* 4. The names "The Jakarta Project", "Tomcat", and "Apache Software
* Foundation" must not be used to endorse or promote products derived
* from this software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache"
* nor may "Apache" appear in their names without prior written
* permission of the Apache Group.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
* [Additional notices, if required by prior licensing conditions]
*
*/
package org.apache.catalina.cluster.tcp;
import org.apache.catalina.cluster.Member;
import java.net.InetAddress;
public class IDataSenderFactory {
private IDataSenderFactory() {
}
public static final String SYNC_MODE="synchronous";
public static final String ASYNC_MODE="asynchronous";
public synchronized static IDataSender getIDataSender(String mode, Member mbr)
throws java.io.IOException {
if (SYNC_MODE.equals(mode) )
return new SocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort());
else if ( ASYNC_MODE.equals(mode) )
return new AsyncSocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort());
else
throw new java.io.IOException("Invalid replication mode="+mode);
}
}
1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java
Index: TcpReplicationThread.java
===================================================================
/*
* $Header: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v 1.1 2003/04/18 02:51:24 fhanik Exp $
* $Revision: 1.1 $
* $Date: 2003/04/18 02:51:24 $
*
* ====================================================================
*
* The Apache Software License, Version 1.1
*
* Copyright (c) 1999 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution, if
* any, must include the following acknowlegement:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowlegement may appear in the software itself,
* if and wherever such third-party acknowlegements normally appear.
*
* 4. The names "The Jakarta Project", "Tomcat", and "Apache Software
* Foundation" must not be used to endorse or promote products derived
* from this software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache"
* nor may "Apache" appear in their names without prior written
* permission of the Apache Group.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
* [Additional notices, if required by prior licensing conditions]
*
*/
package org.apache.catalina.cluster.tcp;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.List;
import java.io.IOException;
import java.nio.channels.SocketChannel;
import org.apache.catalina.cluster.io.ObjectReader;
import java.util.LinkedList;
/**
* A worker thread class which can drain channels and echo-back
* the input. Each instance is constructed with a reference to
* the owning thread pool object. When started, the thread loops
* forever waiting to be awakened to service the channel associated
* with a SelectionKey object.
* The worker is tasked by calling its serviceChannel() method
* with a SelectionKey object. The serviceChannel() method stores
* the key reference in the thread object then calls notify()
* to wake it up. When the channel has been drained, the worker
* thread returns itself to its parent pool.
*/
public class TcpReplicationThread extends WorkerThread
{
private static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog( SimpleTcpCluster.class );
private ByteBuffer buffer = ByteBuffer.allocate (1024);
private SelectionKey key;
TcpReplicationThread ()
{
}
// loop forever waiting for work to do
public synchronized void run()
{
while (doRun) {
try {
// sleep and release object lock
this.wait();
} catch (InterruptedException e) {
log.info("TCP worker thread interrupted in cluster",e);
// clear interrupt status
this.interrupted();
}
if (key == null) {
continue; // just in case
}
try {
drainChannel (key);
} catch (Exception e) {
log.info ("TCP Worker thread in cluster caught '"
+ e + "' closing channel");
// close channel and nudge selector
try {
key.channel().close();
} catch (IOException ex) {
log.error("Unable to close channel.",ex);
}
key.selector().wakeup();
}
key = null;
// done, ready for more, return to pool
this.pool.returnWorker (this);
}
}
/**
* Called to initiate a unit of work by this worker thread
* on the provided SelectionKey object. This method is
* synchronized, as is the run() method, so only one key
* can be serviced at a given time.
* Before waking the worker thread, and before returning
* to the main selection loop, this key's interest set is
* updated to remove OP_READ. This will cause the selector
* to ignore read-readiness for this channel while the
* worker thread is servicing it.
*/
synchronized void serviceChannel (SelectionKey key)
{
this.key = key;
key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
this.notify(); // awaken the thread
}
/**
* The actual code which drains the channel associated with
* the given key. This method assumes the key has been
* modified prior to invocation to turn off selection
* interest in OP_READ. When this method completes it
* re-enables OP_READ and calls wakeup() on the selector
* so the selector will resume watching this channel.
*/
void drainChannel (SelectionKey key)
throws Exception
{
SocketChannel channel = (SocketChannel) key.channel();
int count;
buffer.clear(); // make buffer empty
ObjectReader reader = (ObjectReader)key.attachment();
// loop while data available, channel is non-blocking
while ((count = channel.read (buffer)) > 0) {
buffer.flip(); // make buffer readable
reader.append(buffer.array(),0,count);
buffer.clear(); // make buffer empty
}
//check to see if any data is available
reader.execute();
if (count < 0) {
// close channel on EOF, invalidates the key
channel.close();
return;
}
// resume interest in OP_READ
key.interestOps (key.interestOps() | SelectionKey.OP_READ);
// cycle the selector so this key is active again
key.selector().wakeup();
}
}
1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/SmartQueue.java
Index: SmartQueue.java
===================================================================
/*
* $Header: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/SmartQueue.java,v 1.1 2003/04/18 02:51:24 fhanik Exp $
* $Revision: 1.1 $
* $Date: 2003/04/18 02:51:24 $
*
* ====================================================================
*
* The Apache Software License, Version 1.1
*
* Copyright (c) 1999 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution, if
* any, must include the following acknowlegement:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowlegement may appear in the software itself,
* if and wherever such third-party acknowlegements normally appear.
*
* 4. The names "The Jakarta Project", "Tomcat", and "Apache Software
* Foundation" must not be used to endorse or promote products derived
* from this software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache"
* nor may "Apache" appear in their names without prior written
* permission of the Apache Group.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
* [Additional notices, if required by prior licensing conditions]
*
*/
package org.apache.catalina.cluster.util;
/**
* A smart queue, used for async replication<BR>
* the "smart" part of this queue is that if the session is already queued for replication,
* and it is updated again, the session will simply be replaced, hence we don't
* replicate stuff that is obsolete.
* Put this into util, since it is quite generic.
*
* @author Filip Hanik
* @version 1.0
*/
import java.util.LinkedList;
import java.util.HashMap;
public class SmartQueue {
/**
* This is the actual queue
*/
private LinkedList queue = new LinkedList();
/**
* And this is only for performance, fast lookups
*/
private HashMap queueMap = new HashMap();
private Object mutex = new Object();
public static int debug = 0;
public SmartQueue() {
}
/**
* Add an object to the queue
* @param entry - the smart entry
*/
public void add(SmartEntry entry) {
/*make sure we are within a synchronized block since we are dealing with two
unsync collections*/
synchronized (mutex) {
/*check to see if this object has already been queued*/
SmartEntry current = (SmartEntry)queueMap.get(entry.getKey());
if ( current == null ) {
/*the object has not been queued, at it to the end of the queue*/
if ( debug != 0 ) System.out.println("["+Thread.currentThread().getName()+"][SmartQueue] Adding new object="+entry);
queue.addLast(entry);
queueMap.put(entry.getKey(),entry);
}else {
/*the object has been queued, replace the value*/
if ( debug != 0 ) System.out.print("["+Thread.currentThread().getName()+"][SmartQueue] Replacing old object="+current);
current.setValue(entry.getValue());
if ( debug != 0 ) System.out.println("with new object="+current);
}
/*wake up all the threads that are waiting for the lock to be released*/
mutex.notifyAll();
}
}
public int size() {
synchronized (mutex) {
return queue.size();
}
}
/**
* Blocks forever until an element has been added to the queue
* @return
*/
public SmartEntry remove() {
SmartEntry result = null;
synchronized (mutex) {
while ( size() == 0 ) {
try {
if ( debug != 0 ) System.out.println("["+Thread.currentThread().getName()+"][SmartQueue] Queue sleeping until object added size="+size()+".");
mutex.wait();
if ( debug != 0 ) System.out.println("["+Thread.currentThread().getName()+"][SmartQueue] Queue woke up or interrupted size="+size()+".");
}
catch(IllegalMonitorStateException ex) {
throw ex;
}
catch(InterruptedException ex) {
}//catch
}//while
/*guaranteed that we are not empty by now*/
result = (SmartEntry)queue.removeFirst();
queueMap.remove(result.getKey());
if ( debug != 0 ) System.out.println("["+Thread.currentThread().getName()+"][SmartQueue] Returning="+result);
}
return result;
}
public static class SmartEntry {
protected Object key;
protected Object value;
public SmartEntry(Object key,
Object value) {
if ( key == null ) throw new IllegalArgumentException("SmartEntry key can not be null.");
if ( value == null ) throw new IllegalArgumentException("SmartEntry value can not be null.");
this.key = key;
this.value = value;
}
public Object getKey() {
return key;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
if ( value == null ) throw new IllegalArgumentException("SmartEntry value can not be null.");
this.value = value;
}
public int hashCode() {
return key.hashCode();
}
public boolean equals(Object o) {
if (!(o instanceof SmartEntry)) return false;
SmartEntry other = (SmartEntry)o;
return other.getKey().equals(getKey());
}
public String toString() {
return "[SmartyEntry key="+key+" value="+value+"]";
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org