You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/11/03 18:31:00 UTC

svn commit: r710109 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport: TransportFactory.java WriteTimeoutFilter.java tcp/TcpBufferedOutputStream.java tcp/TcpTransport.java

Author: gtully
Date: Mon Nov  3 09:30:59 2008
New Revision: 710109

URL: http://svn.apache.org/viewvc?rev=710109&view=rev
Log:
apply patch for AMQ-1993, thanks

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java?rev=710109&r1=710108&r2=710109&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java Mon Nov  3 09:30:59 2008
@@ -26,8 +26,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
-import javax.net.ssl.SSLContext;
-
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.SslContext;
@@ -44,6 +42,7 @@
     private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
     private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>();
 
+    private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
     private static final String THREAD_NAME_FILTER = "threadName";
     
     public abstract TransportServer doBind(URI location) throws IOException;
@@ -265,6 +264,11 @@
      * @throws Exception
      */
     public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
+        if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
+            transport = new WriteTimeoutFilter(transport);
+            String soWriteTimeout = (String)options.get(WRITE_TIMEOUT_FILTER);
+            if (soWriteTimeout!=null) ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
+        }
         if (options.containsKey(THREAD_NAME_FILTER)) {
             transport = new ThreadNameFilter(transport);
         }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java?rev=710109&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java Mon Nov  3 09:30:59 2008
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
+import org.apache.activemq.transport.tcp.TcpTransport;
+
+/**
+ * This filter implements write timeouts for socket write operations.
+ * When using blocking IO, the Java implementation doesn't have an explicit flag
+ * to set a timeout, and can cause operations to block forever (or until the TCP stack implementation times out the retransmissions,
+ * which is usually around 13-30 minutes).<br/>
+ * To enable this transport, in the transport URI, simpley add<br/>
+ * <code>transport.soWriteTimeout=<value in millis></code>.<br/>
+ * For example (15 second timeout on write operations to the socket):</br>
+ * <pre><code>
+ * &lt;transportConnector 
+ *     name=&quot;tcp1&quot; 
+ *     uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
+ * /&gt;
+ * </code></pre><br/>
+ * For example (enable default timeout on the socket):</br>
+ * <pre><code>
+ * &lt;transportConnector 
+ *     name=&quot;tcp1&quot; 
+ *     uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
+ * /&gt;
+ * </code></pre>
+ * @author Filip Hanik
+ *
+ */
+public class WriteTimeoutFilter extends TransportFilter {
+	
+    protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue<WriteTimeoutFilter>();
+    protected static AtomicInteger messageCounter = new AtomicInteger(0);
+    protected static TimeoutThread timeoutThread = new TimeoutThread(); 
+    
+	protected long writeTimeout = -1;
+	
+    public WriteTimeoutFilter(Transport next) {
+        super(next);
+    }
+
+    @Override
+    public void oneway(Object command) throws IOException {
+        try {
+            registerWrite(this);
+            super.oneway(command);
+        } catch (IOException x) {
+            deRegisterWrite(this,true,x);
+            throw x;
+        } finally {
+            deRegisterWrite(this,false,null);
+        }
+    }
+    
+	public long getWriteTimeout() {
+		return writeTimeout;
+	}
+
+	public void setWriteTimeout(long writeTimeout) {
+		this.writeTimeout = writeTimeout;
+	}
+	
+	protected TcpBufferedOutputStream getWriter() {
+	    return next.narrow(TcpBufferedOutputStream.class);
+	}
+	
+	protected static void registerWrite(WriteTimeoutFilter filter) {
+	    writers.add(filter);
+	}
+	
+    protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
+        boolean result = writers.remove(filter); 
+        if (result) {
+            if (fail) {
+                IOException ex = (iox!=null)?iox:new IOException("Forced write timeout for:"+filter.getNext().getRemoteAddress());
+                filter.getTransportListener().onException(ex);
+            }
+        }
+        return result;
+    }
+    
+    @Override
+    public void start() throws Exception {
+        super.start();
+    }
+    
+    @Override
+    public void stop() throws Exception {
+        super.stop();
+    }
+    
+    protected static class TimeoutThread extends Thread {
+        static AtomicInteger instance = new AtomicInteger(0);
+        boolean run = true;
+        public TimeoutThread() {
+            setName("WriteTimeoutFilter-Timeout-"+instance.incrementAndGet());
+            setDaemon(true);
+            setPriority(Thread.MIN_PRIORITY);
+            start();
+        }
+
+        
+        public void run() {
+            while (run) {
+                if (!interrupted()) {
+                    Iterator<WriteTimeoutFilter> filters = writers.iterator();
+                    while (run && filters.hasNext()) { 
+                        WriteTimeoutFilter filter = filters.next();
+                        if (filter.getWriteTimeout()<=0) continue; //no timeout set
+                        long writeStart = filter.getWriter().getWriteTimestamp();
+                        long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1;
+                        if (delta>filter.getWriteTimeout()) {
+                            WriteTimeoutFilter.deRegisterWrite(filter, true,null);
+                        }//if timeout
+                    }//while
+                }//if interrupted
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException x) {
+                    //do nothing
+                }
+            }
+        }
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java?rev=710109&r1=710108&r2=710109&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java Mon Nov  3 09:30:59 2008
@@ -32,6 +32,8 @@
     private byte[] buffer;
     private int bufferlen;
     private int count;
+    private volatile long writeTimestamp = -1;//concurrent reads of this value
+    
 
     /**
      * Constructor
@@ -89,7 +91,12 @@
                 System.arraycopy(b, off, buffer, count, len);
                 count += len;
             } else {
-                out.write(b, off, len);
+                try {
+                    writeTimestamp = System.currentTimeMillis();
+                    out.write(b, off, len);
+                } finally {
+                    writeTimestamp = System.currentTimeMillis();
+                }
             }
         }
     }
@@ -103,7 +110,12 @@
      */
     public void flush() throws IOException {
         if (count > 0 && out != null) {
-            out.write(buffer, 0, count);
+            try {
+                writeTimestamp = System.currentTimeMillis();
+                out.write(buffer, 0, count);
+            } finally {
+            	writeTimestamp = -1;
+            }
             count = 0;
         }
     }
@@ -117,4 +129,12 @@
         super.close();
     }
 
+    public boolean isWriting() {
+        return writeTimestamp > 0;
+    }
+    
+    public long getWriteTimestamp() {
+    	return writeTimestamp;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=710109&r1=710108&r2=710109&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Mon Nov  3 09:30:59 2008
@@ -69,6 +69,7 @@
     protected Socket socket;
     protected DataOutputStream dataOut;
     protected DataInputStream dataIn;
+    protected TcpBufferedOutputStream buffOut = null;
     /**
      * trace=true -> the Transport stack where this TcpTransport
      * object will be, will have a TransportLogger layer
@@ -505,7 +506,7 @@
     protected void initializeStreams() throws Exception {
         TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
         this.dataIn = new DataInputStream(buffIn);
-        TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
+        buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
         this.dataOut = new DataOutputStream(buffOut);
     }
 
@@ -533,9 +534,12 @@
     public <T> T narrow(Class<T> target) {
         if (target == Socket.class) {
             return target.cast(socket);
+        } else if ( target == TcpBufferedOutputStream.class) {
+            return target.cast(buffOut);
         }
         return super.narrow(target);
     }
+    
 
     static {
         SOCKET_CLOSE =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {