You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2017/02/13 10:14:04 UTC
svn commit: r1782735 - in /qpid/java/trunk/broker-plugins:
amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Author: rgodfrey
Date: Mon Feb 13 10:14:04 2017
New Revision: 1782735
URL: http://svn.apache.org/viewvc?rev=1782735&view=rev
Log:
QPID-7670 : WebSocket transport does not respect AMQP idle timeout
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java?rev=1782735&r1=1782734&r2=1782735&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java Mon Feb 13 10:14:04 2017
@@ -1314,7 +1314,7 @@ public class AMQPConnection_1_0Impl exte
FRAME_LOGGER.debug("SEND[{}|{}] : {}",
getNetwork().getRemoteAddress(),
amqFrame.getChannel(),
- amqFrame.getFrameBody());
+ amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : amqFrame.getFrameBody());
int size = _frameWriter.send(amqFrame);
if (size > getMaxFrameSize())
Modified: qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1782735&r1=1782734&r2=1782735&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Mon Feb 13 10:14:04 2017
@@ -32,7 +32,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
@@ -54,20 +55,21 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.transport.MultiVersionProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.transport.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.transport.AcceptingTransport;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.server.transport.MultiVersionProtocolEngine;
+import org.apache.qpid.server.transport.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.transport.SchedulingDelayNotificationListener;
import org.apache.qpid.server.transport.ServerNetworkConnection;
+import org.apache.qpid.server.transport.network.Ticker;
+import org.apache.qpid.server.transport.network.security.ssl.SSLUtil;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.transport.ByteBufferSender;
-import org.apache.qpid.server.transport.network.security.ssl.SSLUtil;
class WebSocketProvider implements AcceptingTransport
{
@@ -82,6 +84,11 @@ class WebSocketProvider implements Accep
private final MultiVersionProtocolEngineFactory _factory;
private Server _server;
+ private final List<ConnectionWrapper> _activeConnections = new CopyOnWriteArrayList<>();
+
+ private final WebSocketIdleTimeoutChecker _idleTimeoutChecker = new WebSocketIdleTimeoutChecker();
+ private final AtomicBoolean _closed = new AtomicBoolean();
+
WebSocketProvider(final Transport transport,
final SSLContext sslContext,
final AmqpPort<?> port,
@@ -101,11 +108,14 @@ class WebSocketProvider implements Accep
_port,
_transport);
+
}
@Override
public void start()
{
+ _idleTimeoutChecker.start();
+
_server = new Server();
final AbstractConnector connector;
@@ -238,7 +248,8 @@ class WebSocketProvider implements Accep
@Override
public void close()
{
-
+ _closed.set(true);
+ _idleTimeoutChecker.wakeup();
}
@Override
@@ -291,13 +302,14 @@ class WebSocketProvider implements Accep
buffer.dispose();
_connectionWrapper.doWrite();
-
}
finally
{
_protocolEngine.setIOThread(null);
}
}
+ _idleTimeoutChecker.wakeup();
+
}
@Override
@@ -308,8 +320,11 @@ class WebSocketProvider implements Accep
connection.setMaxBinaryMessageSize(0);
+ // Let AMQP do timeout handling
+ connection.setMaxIdleTime(0);
+
_connectionWrapper =
- new ConnectionWrapper(connection, _localAddress, _remoteAddress, _protocolEngine);
+ new ConnectionWrapper(connection, _localAddress, _remoteAddress, _protocolEngine, _threadPool);
_connectionWrapper.setPeerCertificate(_userCertificate);
_protocolEngine.setNetworkConnection(_connectionWrapper);
_protocolEngine.setWorkListener(new Action<ProtocolEngine>()
@@ -327,14 +342,16 @@ class WebSocketProvider implements Accep
});
}
});
-
-
+ _activeConnections.add(_connectionWrapper);
+ _idleTimeoutChecker.wakeup();
}
@Override
public void onClose(final int closeCode, final String message)
{
_protocolEngine.closed();
+ _activeConnections.remove(_connectionWrapper);
+ _idleTimeoutChecker.wakeup();
}
}
@@ -345,7 +362,8 @@ class WebSocketProvider implements Accep
private final SocketAddress _remoteAddress;
private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
private final MultiVersionProtocolEngine _protocolEngine;
- private final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
+ private final ThreadPool _threadPool;
+ private final Runnable _tickJob;
private Certificate _certificate;
private long _maxWriteIdleMillis;
@@ -353,12 +371,27 @@ class WebSocketProvider implements Accep
public ConnectionWrapper(final WebSocket.Connection connection,
final SocketAddress localAddress,
- final SocketAddress remoteAddress, final MultiVersionProtocolEngine protocolEngine)
+ final SocketAddress remoteAddress,
+ final MultiVersionProtocolEngine protocolEngine,
+ final ThreadPool threadPool)
{
_connection = connection;
_localAddress = localAddress;
_remoteAddress = remoteAddress;
_protocolEngine = protocolEngine;
+ _threadPool = threadPool;
+ _tickJob = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ synchronized (ConnectionWrapper.this)
+ {
+ protocolEngine.getAggregateTicker().tick(System.currentTimeMillis());
+ doWrite();
+ }
+ }
+ };
}
@Override
@@ -502,7 +535,6 @@ class WebSocketProvider implements Accep
try
{
_connection.sendMessage(data, 0, size);
- _usedOutboundMessageSpace.set(0);
}
catch (IOException e)
{
@@ -526,7 +558,7 @@ class WebSocketProvider implements Accep
}
doWrite();
-
+ _idleTimeoutChecker.wakeup();
}
finally
{
@@ -534,5 +566,73 @@ class WebSocketProvider implements Accep
}
}
+
+
+ public void tick()
+ {
+ _threadPool.dispatch(_tickJob);
+ }
+ }
+
+
+
+ private class WebSocketIdleTimeoutChecker extends Thread
+ {
+
+ public WebSocketIdleTimeoutChecker()
+ {
+ setName("WebSocket Idle Checker: " + _port);
+ }
+
+ @Override
+ public void run()
+ {
+ while(!_closed.get())
+ {
+ ConnectionWrapper connectionToTick = null;
+ long currentTime = System.currentTimeMillis();
+ synchronized (this)
+ {
+ long nextTick = Long.MAX_VALUE;
+ for(ConnectionWrapper connection : _activeConnections)
+ {
+ ProtocolEngine engine = connection._protocolEngine;
+ final Ticker ticker = engine.getAggregateTicker();
+ long tick = ticker.getTimeToNextTick(currentTime);
+ if(tick <= 0)
+ {
+ connectionToTick = connection;
+ nextTick = -1;
+ break;
+ }
+ else if(tick < nextTick)
+ {
+ nextTick = tick;
+ }
+ }
+ if(nextTick > 0)
+ {
+ try
+ {
+ wait(nextTick);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+ if(connectionToTick != null)
+ {
+ connectionToTick.tick();
+ }
+ }
+ }
+
+ private synchronized void wakeup()
+ {
+ notifyAll();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org