You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by pe...@apache.org on 2005/11/18 22:07:36 UTC
svn commit: r345567 - in /tomcat/container/tc5.5.x:
modules/cluster/src/share/org/apache/catalina/cluster/tcp/
modules/cluster/src/share/org/apache/catalina/cluster/util/ webapps/docs/
Author: pero
Date: Fri Nov 18 13:07:23 2005
New Revision: 345567
URL: http://svn.apache.org/viewcvs?rev=345567&view=rev
Log:
Fix that sendMessage signature at all DataSender subclasses must be changed.
Fix that socket at o.a.c.cluster.tcp.FastAsyncSocketSender can be disconnect/connect
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/FastQueue.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/SingleRemoveSynchronizedAddLock.java
tomcat/container/tc5.5.x/webapps/docs/changelog.xml
Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java?rev=345567&r1=345566&r2=345567&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java Fri Nov 18 13:07:23 2005
@@ -152,23 +152,23 @@
super.disconnect();
}
- /*
+ /**
* Send message to queue for later sending
*
- * @see org.apache.catalina.cluster.tcp.IDataSender#sendMessage(java.lang.String,
- * byte[])
+ * @see org.apache.catalina.cluster.tcp.DataSender#pushMessage(ClusterData)
*/
- public void sendMessage(String messageid, ClusterData data)
+ public void sendMessage(ClusterData data)
throws java.io.IOException {
- SmartQueue.SmartEntry entry = new SmartQueue.SmartEntry(messageid, data);
+ SmartQueue.SmartEntry entry = new SmartQueue.SmartEntry(data.getUniqueId(), data);
queue.add(entry);
synchronized (this) {
inQueueCounter++;
- queueThread.incQueuedNrOfBytes(data.getMessage().length);
+ if(queueThread != null)
+ queueThread.incQueuedNrOfBytes(data.getMessage().length);
}
if (log.isTraceEnabled())
log.trace(sm.getString("AsyncSocketSender.queue.message",
- getAddress().getHostAddress(), new Integer(getPort()), messageid, new Long(
+ getAddress().getHostAddress(), new Integer(getPort()), data.getUniqueId(), new Long(
data.getMessage().length)));
}
Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java?rev=345567&r1=345566&r2=345567&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java Fri Nov 18 13:07:23 2005
@@ -666,6 +666,11 @@
if(isConnected())
return ;
try {
+ throw new Exception("open socket") ;
+ } catch ( Exception ex2) {
+ log.debug("open socket exception",ex2);
+ }
+ try {
createSocket();
if (isWaitForAck())
socket.setSoTimeout((int) ackTimeout);
Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java?rev=345567&r1=345566&r2=345567&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java Fri Nov 18 13:07:23 2005
@@ -273,15 +273,16 @@
// --------------------------------------------------------- Public Methods
- /*
- * Connect to socket and start background thread to ppush queued messages
+ /**
+ * Connect to socket and start background thread to push queued messages
*
* @see org.apache.catalina.cluster.tcp.IDataSender#connect()
*/
public void connect() throws java.io.IOException {
super.connect();
checkThread();
- queue.start() ;
+ if(!queue.isEnabled())
+ queue.start() ;
}
/**
@@ -291,25 +292,30 @@
*/
public void disconnect() {
stopThread();
+ // delete "remove" lock at queue
queue.stop() ;
+ // enable that sendMessage can add new messages
+ queue.start() ;
+ // close socket
super.disconnect();
}
/**
* Send message to queue for later sending.
*
- * @see org.apache.catalina.cluster.tcp.IDataSender#sendMessage(ClusterData)
+ * @see org.apache.catalina.cluster.tcp.DataSender#pushMessage(ClusterData)
*/
- public void sendMessage(String messageid, ClusterData data)
+ public void sendMessage(ClusterData data)
throws java.io.IOException {
- queue.add(messageid, data);
+ queue.add(data.getUniqueId(), data);
synchronized (this) {
inQueueCounter++;
- queueThread.incQueuedNrOfBytes(data.getMessage().length);
- }
- if (log.isTraceEnabled())
+ if(queueThread != null)
+ queueThread.incQueuedNrOfBytes(data.getMessage().length);
+ }
+ if (log.isTraceEnabled())
log.trace(sm.getString("AsyncSocketSender.queue.message",
- getAddress().getHostAddress(), new Integer(getPort()), messageid, new Long(
+ getAddress().getHostAddress(), new Integer(getPort()), data.getUniqueId(), new Long(
data.getMessage().length)));
}
@@ -417,17 +423,20 @@
}
- public void stopRunning() {
+ /**
+ * Stop backend thread!
+ */
+ public void stopRunning() {
keepRunning = false;
}
- /* Get the objects from queue and send all mesages to the sender.
+ /**
+ * Get the objects from queue and send all mesages to the sender.
* @see java.lang.Runnable#run()
*/
public void run() {
while (keepRunning) {
- long queueSize;
LinkObject entry = getQueuedMessage();
if (entry != null) {
pushQueuedMessages(entry);
@@ -442,7 +451,8 @@
}
/**
- * @return
+ * Get List of queue cluster messages
+ * @return list of cluster messages
*/
protected LinkObject getQueuedMessage() {
// get a link list of all queued objects
Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java?rev=345567&r1=345566&r2=345567&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java Fri Nov 18 13:07:23 2005
@@ -90,32 +90,32 @@
// ----------------------------------------------------- Public Methode
- public void connect() throws java.io.IOException {
+ public synchronized void connect() throws java.io.IOException {
//do nothing, happens in the socket sender itself
senderQueue.open();
setSocketConnected(true);
connectCounter++;
}
- public void disconnect() {
+ public synchronized void disconnect() {
senderQueue.close();
setSocketConnected(false);
disconnectCounter++;
}
/**
- * send Message and use a pool of SocketSenders
+ * send message and use a pool of SocketSenders
*
* @param messageId Message unique identifier
* @param data Message data
* @throws java.io.IOException
*/
- public void sendMessage(String messageId, ClusterData data) throws IOException {
+ public void sendMessage(ClusterData data) throws IOException {
//get a socket sender from the pool
if(!isConnected()) {
synchronized(this) {
- if(!isConnected())
- connect();
+ if(!isConnected())
+ connect();
}
}
SocketSender sender = senderQueue.getSender(0);
Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java?rev=345567&r1=345566&r2=345567&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java Fri Nov 18 13:07:23 2005
@@ -822,7 +822,6 @@
// FIXME add Stats how much comress and uncompress messages and bytes are transfered
if ((isCompress() && msg.getCompress() != ClusterMessage.FLAG_FORBIDDEN)
|| msg.getCompress() == ClusterMessage.FLAG_ALLOWED) {
- System.out.println("msg compress: " + msg.getUniqueId() );
gout = new GZIPOutputStream(outs);
out = new ObjectOutputStream(gout);
} else {
Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/FastQueue.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/FastQueue.java?rev=345567&r1=345566&r2=345567&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/FastQueue.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/FastQueue.java Fri Nov 18 13:07:23 2005
@@ -65,19 +65,10 @@
*/
private boolean doStats = false;
- /**
- *
- */
private boolean inAdd = false;
- /**
- *
- */
private boolean inRemove = false;
- /**
- *
- */
private boolean inMutex = false;
/**
@@ -141,14 +132,8 @@
*/
private long avgSize = 0;
- /*
- *
- */
private int maxSizeSample = 0;
- /*
- *
- */
private long avgSizeSample = 0;
/**
@@ -206,7 +191,7 @@
lock.setRemoveWaitTimeout(removeWaitTimeout);
}
- /*
+ /**
* get Max Queue length
*
* @see org.apache.catalina.cluster.util.IQueue#getMaxQueueLength()
@@ -230,42 +215,42 @@
}
}
- /*
+ /**
* @return Returns the checkLock.
*/
public boolean isCheckLock() {
return checkLock;
}
- /*
+ /**
* @param checkLock The checkLock to set.
*/
public void setCheckLock(boolean checkLock) {
this.checkLock = checkLock;
}
- /*
+ /**
* @return Returns the doStats.
*/
public boolean isDoStats() {
return doStats;
}
- /*
+ /**
* @param doStats The doStats to set.
*/
public void setDoStats(boolean doStats) {
this.doStats = doStats;
}
- /*
+ /**
* @return Returns the timeWait.
*/
public boolean isTimeWait() {
return timeWait;
}
- /*
+ /**
* @param timeWait The timeWait to set.
*/
public void setTimeWait(boolean timeWait) {
@@ -426,7 +411,8 @@
return sz;
}
- /* Add new data to the queue
+ /**
+ * Add new data to the queue
* @see org.apache.catalina.cluster.util.IQueue#add(java.lang.String, java.lang.Object)
* FIXME extract some method
*/
@@ -436,7 +422,7 @@
if (!enabled) {
if (log.isInfoEnabled())
- log.info("FastQueue: queue disabled, add aborted");
+ log.info("FastQueue.add: queue disabled, add aborted");
return false;
}
@@ -450,7 +436,7 @@
}
if (log.isTraceEnabled()) {
- log.trace("FastQueue: add starting with size " + size);
+ log.trace("FastQueue.add: starting with size " + size);
}
if (checkLock) {
if (inAdd)
@@ -464,7 +450,7 @@
if ((maxQueueLength > 0) && (size >= maxQueueLength)) {
ok = false;
if (log.isTraceEnabled()) {
- log.trace("FastQueue: Could not add, since queue is full ("
+ log.trace("FastQueue.add: Could not add, since queue is full ("
+ size + ">=" + maxQueueLength + ")");
}
@@ -477,7 +463,7 @@
if (last == null) {
ok = false;
log
- .error("FastQueue: Could not add, since last is null although size is "
+ .error("FastQueue.add: Could not add, since last is null although size is "
+ size + " (>0)");
} else {
last.append(element);
@@ -509,24 +495,24 @@
}
if (first == null) {
- log.error("FastQueue: first is null, size is " + size
+ log.error("FastQueue.add: first is null, size is " + size
+ " at end of add");
}
if (last == null) {
- log.error("FastQueue: last is null, size is " + size
+ log.error("FastQueue.add: last is null, size is " + size
+ " at end of add");
}
if (checkLock) {
if (!inMutex)
- log.warn("FastQueue: Cancelled by other mutex in add");
+ log.warn("FastQueue.add: Cancelled by other mutex in add");
inMutex = false;
if (!inAdd)
- log.warn("FastQueue: Cancelled by other add");
+ log.warn("FastQueue.add: Cancelled by other add");
inAdd = false;
}
if (log.isTraceEnabled()) {
- log.trace("FastQueue: add ending with size " + size);
+ log.trace("FastQueue.add: add ending with size " + size);
}
if (timeWait) {
@@ -541,7 +527,8 @@
return ok;
}
- /* remove the complete queued object list
+ /**
+ * remove the complete queued object list
* @see org.apache.catalina.cluster.util.IQueue#remove()
* FIXME extract some method
*/
@@ -552,7 +539,7 @@
if (!enabled) {
if (log.isInfoEnabled())
- log.info("FastQueue: queue disabled, remove aborted");
+ log.info("FastQueue.remove: queue disabled, remove aborted");
return null;
}
@@ -571,11 +558,10 @@
removeErrorCounter++;
}
if (log.isInfoEnabled())
- log
- .info("FastQueue: Remove aborted although queue enabled");
+ log.info("FastQueue.remove: Remove aborted although queue enabled");
} else {
if (log.isInfoEnabled())
- log.info("FastQueue: queue disabled, remove aborted");
+ log.info("FastQueue.remove: queue disabled, remove aborted");
}
return null;
}
@@ -585,14 +571,14 @@
}
if (log.isTraceEnabled()) {
- log.trace("FastQueue: remove starting with size " + size);
+ log.trace("FastQueue.remove: remove starting with size " + size);
}
if (checkLock) {
if (inRemove)
- log.warn("FastQueue: Detected other remove");
+ log.warn("FastQueue.remove: Detected other remove");
inRemove = true;
if (inMutex)
- log.warn("FastQueue: Detected other mutex in remove");
+ log.warn("FastQueue.remove: Detected other mutex in remove");
inMutex = true;
}
@@ -604,7 +590,7 @@
} else {
removeErrorCounter++;
log
- .error("FastQueue: Could not remove, since first is null although size is "
+ .error("FastQueue.remove: Could not remove, since first is null although size is "
+ size + " (>0)");
}
}
@@ -614,14 +600,14 @@
if (checkLock) {
if (!inMutex)
- log.warn("FastQueue: Cancelled by other mutex in remove");
+ log.warn("FastQueue.remove: Cancelled by other mutex in remove");
inMutex = false;
if (!inRemove)
- log.warn("FastQueue: Cancelled by other remove");
+ log.warn("FastQueue.remove: Cancelled by other remove");
inRemove = false;
}
if (log.isTraceEnabled()) {
- log.trace("FastQueue: remove ending with size " + size);
+ log.trace("FastQueue.remove: remove ending with size " + size);
}
if (timeWait) {
Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/SingleRemoveSynchronizedAddLock.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/SingleRemoveSynchronizedAddLock.java?rev=345567&r1=345566&r2=345567&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/SingleRemoveSynchronizedAddLock.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/SingleRemoveSynchronizedAddLock.java Fri Nov 18 13:07:23 2005
@@ -196,6 +196,7 @@
*/
public synchronized boolean lockRemove() {
removeLocked=false;
+ removeEnabled=true;
if ( ( addLocked || ! dataAvailable ) && removeEnabled ) {
remover=Thread.currentThread();
do {
Modified: tomcat/container/tc5.5.x/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/webapps/docs/changelog.xml?rev=345567&r1=345566&r2=345567&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/webapps/docs/changelog.xml (original)
+++ tomcat/container/tc5.5.x/webapps/docs/changelog.xml Fri Nov 18 13:07:23 2005
@@ -145,6 +145,13 @@
<subsection name="Cluster">
<changelog>
<fix>
+ Fix that sendMessage signature at all DataSender subclasses must be changed.
+ Now pooled and async modes working as expected. (pero)
+ </fix>
+ <fix>
+ Fix that socket at o.a.c.cluster.tcp.FastAsyncSocketSender can be disconnect/connect. (pero)
+ </fix>
+ <fix>
Fix cluster module build.xml script for new svn repository structure (pero)
</fix>
<fix>
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org