You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/12/15 20:51:16 UTC

svn commit: r726785 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java

Author: gtully
Date: Mon Dec 15 11:51:16 2008
New Revision: 726785

URL: http://svn.apache.org/viewvc?rev=726785&view=rev
Log:
resolve AMQ-2006 - patch applied with thanks

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java?rev=726785&r1=726784&r2=726785&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java Mon Dec 15 11:51:16 2008
@@ -18,6 +18,7 @@
 package org.apache.activemq.transport;
 
 import java.io.IOException;
+import java.net.Socket;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -25,7 +26,8 @@
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
-import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * This filter implements write timeouts for socket write operations.
@@ -52,13 +54,16 @@
  *
  */
 public class WriteTimeoutFilter extends TransportFilter {
-	
+
+    private static final Log LOG = LogFactory.getLog(WriteTimeoutFilter.class);
     protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue<WriteTimeoutFilter>();
     protected static AtomicInteger messageCounter = new AtomicInteger(0);
     protected static TimeoutThread timeoutThread = new TimeoutThread(); 
     
-	protected long writeTimeout = -1;
-	
+    protected static long sleep = 5000l;
+
+    protected long writeTimeout = -1;
+    
     public WriteTimeoutFilter(Transport next) {
         super(next);
     }
@@ -69,35 +74,56 @@
             registerWrite(this);
             super.oneway(command);
         } catch (IOException x) {
-            deRegisterWrite(this,true,x);
             throw x;
         } finally {
             deRegisterWrite(this,false,null);
         }
     }
     
-	public long getWriteTimeout() {
-		return writeTimeout;
-	}
-
-	public void setWriteTimeout(long writeTimeout) {
-		this.writeTimeout = writeTimeout;
-	}
-	
-	protected TcpBufferedOutputStream getWriter() {
-	    return next.narrow(TcpBufferedOutputStream.class);
-	}
-	
-	protected static void registerWrite(WriteTimeoutFilter filter) {
-	    writers.add(filter);
-	}
-	
+    public long getWriteTimeout() {
+        return writeTimeout;
+    }
+
+    public void setWriteTimeout(long writeTimeout) {
+        this.writeTimeout = writeTimeout;
+    }
+    
+    public static long getSleep() {
+        return sleep;
+    }
+
+    public static void setSleep(long sleep) {
+        WriteTimeoutFilter.sleep = sleep;
+    }
+
+    
+    protected TcpBufferedOutputStream getWriter() {
+        return next.narrow(TcpBufferedOutputStream.class);
+    }
+    
+    protected Socket getSocket() {
+        return next.narrow(Socket.class);
+    }
+    
+    protected static void registerWrite(WriteTimeoutFilter filter) {
+        writers.add(filter);
+    }
+    
     protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
         boolean result = writers.remove(filter); 
         if (result) {
             if (fail) {
-                IOException ex = (iox!=null)?iox:new IOException("Forced write timeout for:"+filter.getNext().getRemoteAddress());
-                filter.getTransportListener().onException(ex);
+                String message = "Forced write timeout for:"+filter.getNext().getRemoteAddress();
+                LOG.warn(message);
+                Socket sock = filter.getSocket();
+                if (sock==null) {
+                    LOG.error("Destination socket is null, unable to close socket.("+message+")");
+                } else {
+                    try {
+                        sock.close();
+                    }catch (IOException ignore) {
+                    }
+                }
             }
         }
         return result;
@@ -126,22 +152,31 @@
         
         public void run() {
             while (run) {
-                if (!interrupted()) {
-                    Iterator<WriteTimeoutFilter> filters = writers.iterator();
-                    while (run && filters.hasNext()) { 
-                        WriteTimeoutFilter filter = filters.next();
-                        if (filter.getWriteTimeout()<=0) continue; //no timeout set
-                        long writeStart = filter.getWriter().getWriteTimestamp();
-                        long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1;
-                        if (delta>filter.getWriteTimeout()) {
-                            WriteTimeoutFilter.deRegisterWrite(filter, true,null);
-                        }//if timeout
-                    }//while
-                }//if interrupted
+            	boolean error = false;
                 try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException x) {
-                    //do nothing
+                	if (!interrupted()) {
+                		Iterator<WriteTimeoutFilter> filters = writers.iterator();
+                	    while (run && filters.hasNext()) { 
+                            WriteTimeoutFilter filter = filters.next();
+                            if (filter.getWriteTimeout()<=0) continue; //no timeout set
+                            long writeStart = filter.getWriter().getWriteTimestamp();
+                            long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1;
+                            if (delta>filter.getWriteTimeout()) {
+                                WriteTimeoutFilter.deRegisterWrite(filter, true,null);
+                            }//if timeout
+                        }//while
+                    }//if interrupted
+                    try {
+                        Thread.sleep(getSleep());
+                        error = false;
+                    } catch (InterruptedException x) {
+                        //do nothing
+                    }
+                }catch (Throwable t) { //make sure this thread never dies
+                    if (!error) { //use error flag to avoid filling up the logs
+                        LOG.error("WriteTimeout thread unable validate existing sockets.",t);
+                        error = true;
+                    }
                 }
             }
         }