You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/02/14 12:38:46 UTC
svn commit: r1782953 - in /qpid/java/branches/6.1.x: ./
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/
Author: orudyy
Date: Tue Feb 14 12:38:46 2017
New Revision: 1782953
URL: http://svn.apache.org/viewvc?rev=1782953&view=rev
Log:
QPID-7670 : WebSocket transport does not respect AMQP idle timeout
merged from trunk using
svn merge -c 1782735 ^/qpid/java/trunk
Merge conflicts are resolved manually
Modified:
qpid/java/branches/6.1.x/ (props changed)
qpid/java/branches/6.1.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/branches/6.1.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Propchange: qpid/java/branches/6.1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 14 12:38:46 2017
@@ -9,5 +9,5 @@
/qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1766544,1766547,1766553,1766666,1766796-1766797,1766806,1767251,1767267-1767268,1767275,1767310,1767326,1767329,1767332,1767514,1767523,1767738,1767825,1767847-1767849,1767882,1767909,1767914,1768016-1768017,1768065,1768643,1768704,1768854,1768875,1768914,1768963,1768967,1768976,1769007,1769009,1769087,1769138-1769139,1769597,1769879,1770236,1770716,1772241,1772365,1772574,1773057,1774039,1774446,1774564,1774885,1775087,1775100,1777939
+/qpid/java/trunk:1766544,1766547,1766553,1766666,1766796-1766797,1766806,1767251,1767267-1767268,1767275,1767310,1767326,1767329,1767332,1767514,1767523,1767738,1767825,1767847-1767849,1767882,1767909,1767914,1768016-1768017,1768065,1768643,1768704,1768854,1768875,1768914,1768963,1768967,1768976,1769007,1769009,1769087,1769138-1769139,1769597,1769879,1770236,1770716,1772241,1772365,1772574,1773057,1774039,1774446,1774564,1774885,1775087,1775100,1777939,1782735
/qpid/trunk/qpid:796646-796653
Modified: qpid/java/branches/6.1.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1782953&r1=1782952&r2=1782953&view=diff
==============================================================================
--- qpid/java/branches/6.1.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/branches/6.1.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Tue Feb 14 12:38:46 2017
@@ -1304,7 +1304,7 @@ public class AMQPConnection_1_0 extends
FRAME_LOGGER.debug("SEND[{}|{}] : {}",
getNetwork().getRemoteAddress(),
amqFrame.getChannel(),
- amqFrame.getFrameBody());
+ amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : amqFrame.getFrameBody());
int size = _frameWriter.send(amqFrame);
if (size > getMaxFrameSize())
Modified: qpid/java/branches/6.1.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1782953&r1=1782952&r2=1782953&view=diff
==============================================================================
--- qpid/java/branches/6.1.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/branches/6.1.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Tue Feb 14 12:38:46 2017
@@ -32,6 +32,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLContext;
@@ -54,13 +56,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.transport.MultiVersionProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.transport.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.transport.AcceptingTransport;
+import org.apache.qpid.server.transport.MultiVersionProtocolEngine;
+import org.apache.qpid.server.transport.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.transport.SchedulingDelayNotificationListener;
import org.apache.qpid.server.transport.ServerNetworkConnection;
@@ -68,6 +70,7 @@ import org.apache.qpid.server.util.Actio
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+import org.apache.qpid.transport.network.Ticker;
class WebSocketProvider implements AcceptingTransport
{
@@ -83,6 +86,11 @@ class WebSocketProvider implements Accep
private Server _server;
private final long _outboundMessageBufferLimit;
+ private final List<ConnectionWrapper> _activeConnections = new CopyOnWriteArrayList<>();
+
+ private final WebSocketIdleTimeoutChecker _idleTimeoutChecker = new WebSocketIdleTimeoutChecker();
+ private final AtomicBoolean _closed = new AtomicBoolean();
+
WebSocketProvider(final Transport transport,
final SSLContext sslContext,
final AmqpPort<?> port,
@@ -104,11 +112,14 @@ class WebSocketProvider implements Accep
_port,
_transport);
+
}
@Override
public void start()
{
+ _idleTimeoutChecker.start();
+
_server = new Server();
final AbstractConnector connector;
@@ -241,7 +252,8 @@ class WebSocketProvider implements Accep
@Override
public void close()
{
-
+ _closed.set(true);
+ _idleTimeoutChecker.wakeup();
}
@Override
@@ -295,7 +307,6 @@ class WebSocketProvider implements Accep
buffer.dispose();
_connectionWrapper.doWrite();
-
_protocolEngine.setMessageAssignmentSuspended(false, true);
}
finally
@@ -303,6 +314,8 @@ class WebSocketProvider implements Accep
_protocolEngine.setIOThread(null);
}
}
+ _idleTimeoutChecker.wakeup();
+
}
@Override
@@ -313,8 +326,11 @@ class WebSocketProvider implements Accep
connection.setMaxBinaryMessageSize(0);
+ // Let AMQP do timeout handling
+ connection.setMaxIdleTime(0);
+
_connectionWrapper =
- new ConnectionWrapper(connection, _localAddress, _remoteAddress, _protocolEngine);
+ new ConnectionWrapper(connection, _localAddress, _remoteAddress, _protocolEngine, _threadPool);
_connectionWrapper.setPeerCertificate(_userCertificate);
_protocolEngine.setNetworkConnection(_connectionWrapper);
_protocolEngine.setWorkListener(new Action<ProtocolEngine>()
@@ -332,14 +348,16 @@ class WebSocketProvider implements Accep
});
}
});
-
-
+ _activeConnections.add(_connectionWrapper);
+ _idleTimeoutChecker.wakeup();
}
@Override
public void onClose(final int closeCode, final String message)
{
_protocolEngine.closed();
+ _activeConnections.remove(_connectionWrapper);
+ _idleTimeoutChecker.wakeup();
}
}
@@ -351,6 +369,8 @@ class WebSocketProvider implements Accep
private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
private final MultiVersionProtocolEngine _protocolEngine;
private final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
+ private final ThreadPool _threadPool;
+ private final Runnable _tickJob;
private Certificate _certificate;
private long _maxWriteIdleMillis;
@@ -358,12 +378,27 @@ class WebSocketProvider implements Accep
public ConnectionWrapper(final WebSocket.Connection connection,
final SocketAddress localAddress,
- final SocketAddress remoteAddress, final MultiVersionProtocolEngine protocolEngine)
+ final SocketAddress remoteAddress,
+ final MultiVersionProtocolEngine protocolEngine,
+ final ThreadPool threadPool)
{
_connection = connection;
_localAddress = localAddress;
_remoteAddress = remoteAddress;
_protocolEngine = protocolEngine;
+ _threadPool = threadPool;
+ _tickJob = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ synchronized (ConnectionWrapper.this)
+ {
+ protocolEngine.getAggregateTicker().tick(System.currentTimeMillis());
+ doWrite();
+ }
+ }
+ };
}
@Override
@@ -516,7 +551,6 @@ class WebSocketProvider implements Accep
try
{
_connection.sendMessage(data, 0, size);
- _usedOutboundMessageSpace.set(0);
}
catch (IOException e)
{
@@ -541,7 +575,7 @@ class WebSocketProvider implements Accep
}
doWrite();
-
+ _idleTimeoutChecker.wakeup();
_protocolEngine.setMessageAssignmentSuspended(false, true);
}
finally
@@ -550,5 +584,73 @@ class WebSocketProvider implements Accep
}
}
+
+
+ public void tick()
+ {
+ _threadPool.dispatch(_tickJob);
+ }
+ }
+
+
+
+ private class WebSocketIdleTimeoutChecker extends Thread
+ {
+
+ public WebSocketIdleTimeoutChecker()
+ {
+ setName("WebSocket Idle Checker: " + _port);
+ }
+
+ @Override
+ public void run()
+ {
+ while(!_closed.get())
+ {
+ ConnectionWrapper connectionToTick = null;
+ long currentTime = System.currentTimeMillis();
+ synchronized (this)
+ {
+ long nextTick = Long.MAX_VALUE;
+ for(ConnectionWrapper connection : _activeConnections)
+ {
+ ProtocolEngine engine = connection._protocolEngine;
+ final Ticker ticker = engine.getAggregateTicker();
+ long tick = ticker.getTimeToNextTick(currentTime);
+ if(tick <= 0)
+ {
+ connectionToTick = connection;
+ nextTick = -1;
+ break;
+ }
+ else if(tick < nextTick)
+ {
+ nextTick = tick;
+ }
+ }
+ if(nextTick > 0)
+ {
+ try
+ {
+ wait(nextTick);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+ if(connectionToTick != null)
+ {
+ connectionToTick.tick();
+ }
+ }
+ }
+
+ private synchronized void wakeup()
+ {
+ notifyAll();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org