You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/12/15 20:51:16 UTC
svn commit: r726785 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
Author: gtully
Date: Mon Dec 15 11:51:16 2008
New Revision: 726785
URL: http://svn.apache.org/viewvc?rev=726785&view=rev
Log:
resolve AMQ-2006 - patch applied with thanks
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java?rev=726785&r1=726784&r2=726785&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java Mon Dec 15 11:51:16 2008
@@ -18,6 +18,7 @@
package org.apache.activemq.transport;
import java.io.IOException;
+import java.net.Socket;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
@@ -25,7 +26,8 @@
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
-import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* This filter implements write timeouts for socket write operations.
@@ -52,13 +54,16 @@
*
*/
public class WriteTimeoutFilter extends TransportFilter {
-
+
+ private static final Log LOG = LogFactory.getLog(WriteTimeoutFilter.class);
protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue<WriteTimeoutFilter>();
protected static AtomicInteger messageCounter = new AtomicInteger(0);
protected static TimeoutThread timeoutThread = new TimeoutThread();
- protected long writeTimeout = -1;
-
+ protected static long sleep = 5000l;
+
+ protected long writeTimeout = -1;
+
public WriteTimeoutFilter(Transport next) {
super(next);
}
@@ -69,35 +74,56 @@
registerWrite(this);
super.oneway(command);
} catch (IOException x) {
- deRegisterWrite(this,true,x);
throw x;
} finally {
deRegisterWrite(this,false,null);
}
}
- public long getWriteTimeout() {
- return writeTimeout;
- }
-
- public void setWriteTimeout(long writeTimeout) {
- this.writeTimeout = writeTimeout;
- }
-
- protected TcpBufferedOutputStream getWriter() {
- return next.narrow(TcpBufferedOutputStream.class);
- }
-
- protected static void registerWrite(WriteTimeoutFilter filter) {
- writers.add(filter);
- }
-
+ public long getWriteTimeout() {
+ return writeTimeout;
+ }
+
+ public void setWriteTimeout(long writeTimeout) {
+ this.writeTimeout = writeTimeout;
+ }
+
+ public static long getSleep() {
+ return sleep;
+ }
+
+ public static void setSleep(long sleep) {
+ WriteTimeoutFilter.sleep = sleep;
+ }
+
+
+ protected TcpBufferedOutputStream getWriter() {
+ return next.narrow(TcpBufferedOutputStream.class);
+ }
+
+ protected Socket getSocket() {
+ return next.narrow(Socket.class);
+ }
+
+ protected static void registerWrite(WriteTimeoutFilter filter) {
+ writers.add(filter);
+ }
+
protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
boolean result = writers.remove(filter);
if (result) {
if (fail) {
- IOException ex = (iox!=null)?iox:new IOException("Forced write timeout for:"+filter.getNext().getRemoteAddress());
- filter.getTransportListener().onException(ex);
+ String message = "Forced write timeout for:"+filter.getNext().getRemoteAddress();
+ LOG.warn(message);
+ Socket sock = filter.getSocket();
+ if (sock==null) {
+ LOG.error("Destination socket is null, unable to close socket.("+message+")");
+ } else {
+ try {
+ sock.close();
+ }catch (IOException ignore) {
+ }
+ }
}
}
return result;
@@ -126,22 +152,31 @@
public void run() {
while (run) {
- if (!interrupted()) {
- Iterator<WriteTimeoutFilter> filters = writers.iterator();
- while (run && filters.hasNext()) {
- WriteTimeoutFilter filter = filters.next();
- if (filter.getWriteTimeout()<=0) continue; //no timeout set
- long writeStart = filter.getWriter().getWriteTimestamp();
- long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1;
- if (delta>filter.getWriteTimeout()) {
- WriteTimeoutFilter.deRegisterWrite(filter, true,null);
- }//if timeout
- }//while
- }//if interrupted
+ boolean error = false;
try {
- Thread.sleep(5000);
- } catch (InterruptedException x) {
- //do nothing
+ if (!interrupted()) {
+ Iterator<WriteTimeoutFilter> filters = writers.iterator();
+ while (run && filters.hasNext()) {
+ WriteTimeoutFilter filter = filters.next();
+ if (filter.getWriteTimeout()<=0) continue; //no timeout set
+ long writeStart = filter.getWriter().getWriteTimestamp();
+ long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1;
+ if (delta>filter.getWriteTimeout()) {
+ WriteTimeoutFilter.deRegisterWrite(filter, true,null);
+ }//if timeout
+ }//while
+ }//if interrupted
+ try {
+ Thread.sleep(getSleep());
+ error = false;
+ } catch (InterruptedException x) {
+ //do nothing
+ }
+ }catch (Throwable t) { //make sure this thread never dies
+ if (!error) { //use error flag to avoid filling up the logs
+ LOG.error("WriteTimeout thread unable validate existing sockets.",t);
+ error = true;
+ }
}
}
}