You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/11/03 18:31:00 UTC
svn commit: r710109 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport:
TransportFactory.java WriteTimeoutFilter.java
tcp/TcpBufferedOutputStream.java tcp/TcpTransport.java
Author: gtully
Date: Mon Nov 3 09:30:59 2008
New Revision: 710109
URL: http://svn.apache.org/viewvc?rev=710109&view=rev
Log:
apply patch for AMQ-1993, thanks
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java?rev=710109&r1=710108&r2=710109&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java Mon Nov 3 09:30:59 2008
@@ -26,8 +26,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
-import javax.net.ssl.SSLContext;
-
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.SslContext;
@@ -44,6 +42,7 @@
private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>();
+ private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
private static final String THREAD_NAME_FILTER = "threadName";
public abstract TransportServer doBind(URI location) throws IOException;
@@ -265,6 +264,11 @@
* @throws Exception
*/
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
+ if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
+ transport = new WriteTimeoutFilter(transport);
+ String soWriteTimeout = (String)options.get(WRITE_TIMEOUT_FILTER);
+ if (soWriteTimeout!=null) ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
+ }
if (options.containsKey(THREAD_NAME_FILTER)) {
transport = new ThreadNameFilter(transport);
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java?rev=710109&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java Mon Nov 3 09:30:59 2008
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
+import org.apache.activemq.transport.tcp.TcpTransport;
+
+/**
+ * This filter implements write timeouts for socket write operations.
+ * When using blocking IO, the Java implementation doesn't have an explicit flag
+ * to set a timeout, and can cause operations to block forever (or until the TCP stack implementation times out the retransmissions,
+ * which is usually around 13-30 minutes).<br/>
+ * To enable this transport, in the transport URI, simpley add<br/>
+ * <code>transport.soWriteTimeout=<value in millis></code>.<br/>
+ * For example (15 second timeout on write operations to the socket):</br>
+ * <pre><code>
+ * <transportConnector
+ * name="tcp1"
+ * uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000"
+ * />
+ * </code></pre><br/>
+ * For example (enable default timeout on the socket):</br>
+ * <pre><code>
+ * <transportConnector
+ * name="tcp1"
+ * uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000"
+ * />
+ * </code></pre>
+ * @author Filip Hanik
+ *
+ */
+public class WriteTimeoutFilter extends TransportFilter {
+
+ protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue<WriteTimeoutFilter>();
+ protected static AtomicInteger messageCounter = new AtomicInteger(0);
+ protected static TimeoutThread timeoutThread = new TimeoutThread();
+
+ protected long writeTimeout = -1;
+
+ public WriteTimeoutFilter(Transport next) {
+ super(next);
+ }
+
+ @Override
+ public void oneway(Object command) throws IOException {
+ try {
+ registerWrite(this);
+ super.oneway(command);
+ } catch (IOException x) {
+ deRegisterWrite(this,true,x);
+ throw x;
+ } finally {
+ deRegisterWrite(this,false,null);
+ }
+ }
+
+ public long getWriteTimeout() {
+ return writeTimeout;
+ }
+
+ public void setWriteTimeout(long writeTimeout) {
+ this.writeTimeout = writeTimeout;
+ }
+
+ protected TcpBufferedOutputStream getWriter() {
+ return next.narrow(TcpBufferedOutputStream.class);
+ }
+
+ protected static void registerWrite(WriteTimeoutFilter filter) {
+ writers.add(filter);
+ }
+
+ protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
+ boolean result = writers.remove(filter);
+ if (result) {
+ if (fail) {
+ IOException ex = (iox!=null)?iox:new IOException("Forced write timeout for:"+filter.getNext().getRemoteAddress());
+ filter.getTransportListener().onException(ex);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void start() throws Exception {
+ super.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ super.stop();
+ }
+
+ protected static class TimeoutThread extends Thread {
+ static AtomicInteger instance = new AtomicInteger(0);
+ boolean run = true;
+ public TimeoutThread() {
+ setName("WriteTimeoutFilter-Timeout-"+instance.incrementAndGet());
+ setDaemon(true);
+ setPriority(Thread.MIN_PRIORITY);
+ start();
+ }
+
+
+ public void run() {
+ while (run) {
+ if (!interrupted()) {
+ Iterator<WriteTimeoutFilter> filters = writers.iterator();
+ while (run && filters.hasNext()) {
+ WriteTimeoutFilter filter = filters.next();
+ if (filter.getWriteTimeout()<=0) continue; //no timeout set
+ long writeStart = filter.getWriter().getWriteTimestamp();
+ long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1;
+ if (delta>filter.getWriteTimeout()) {
+ WriteTimeoutFilter.deRegisterWrite(filter, true,null);
+ }//if timeout
+ }//while
+ }//if interrupted
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException x) {
+ //do nothing
+ }
+ }
+ }
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java?rev=710109&r1=710108&r2=710109&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java Mon Nov 3 09:30:59 2008
@@ -32,6 +32,8 @@
private byte[] buffer;
private int bufferlen;
private int count;
+ private volatile long writeTimestamp = -1;//concurrent reads of this value
+
/**
* Constructor
@@ -89,7 +91,12 @@
System.arraycopy(b, off, buffer, count, len);
count += len;
} else {
- out.write(b, off, len);
+ try {
+ writeTimestamp = System.currentTimeMillis();
+ out.write(b, off, len);
+ } finally {
+ writeTimestamp = System.currentTimeMillis();
+ }
}
}
}
@@ -103,7 +110,12 @@
*/
public void flush() throws IOException {
if (count > 0 && out != null) {
- out.write(buffer, 0, count);
+ try {
+ writeTimestamp = System.currentTimeMillis();
+ out.write(buffer, 0, count);
+ } finally {
+ writeTimestamp = -1;
+ }
count = 0;
}
}
@@ -117,4 +129,12 @@
super.close();
}
+ public boolean isWriting() {
+ return writeTimestamp > 0;
+ }
+
+ public long getWriteTimestamp() {
+ return writeTimestamp;
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=710109&r1=710108&r2=710109&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Mon Nov 3 09:30:59 2008
@@ -69,6 +69,7 @@
protected Socket socket;
protected DataOutputStream dataOut;
protected DataInputStream dataIn;
+ protected TcpBufferedOutputStream buffOut = null;
/**
* trace=true -> the Transport stack where this TcpTransport
* object will be, will have a TransportLogger layer
@@ -505,7 +506,7 @@
protected void initializeStreams() throws Exception {
TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
this.dataIn = new DataInputStream(buffIn);
- TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
+ buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
this.dataOut = new DataOutputStream(buffOut);
}
@@ -533,9 +534,12 @@
public <T> T narrow(Class<T> target) {
if (target == Socket.class) {
return target.cast(socket);
+ } else if ( target == TcpBufferedOutputStream.class) {
+ return target.cast(buffOut);
}
return super.narrow(target);
}
+
static {
SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {