You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2017/02/13 10:14:04 UTC

svn commit: r1782735 - in /qpid/java/trunk/broker-plugins: amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java

Author: rgodfrey
Date: Mon Feb 13 10:14:04 2017
New Revision: 1782735

URL: http://svn.apache.org/viewvc?rev=1782735&view=rev
Log:
QPID-7670 : WebSocket transport does not respect AMQP idle timeout

Modified:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
    qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java?rev=1782735&r1=1782734&r2=1782735&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java Mon Feb 13 10:14:04 2017
@@ -1314,7 +1314,7 @@ public class AMQPConnection_1_0Impl exte
         FRAME_LOGGER.debug("SEND[{}|{}] : {}",
                            getNetwork().getRemoteAddress(),
                            amqFrame.getChannel(),
-                           amqFrame.getFrameBody());
+                           amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : amqFrame.getFrameBody());
 
         int size = _frameWriter.send(amqFrame);
         if (size > getMaxFrameSize())

Modified: qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1782735&r1=1782734&r2=1782735&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Mon Feb 13 10:14:04 2017
@@ -32,7 +32,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
@@ -54,20 +55,21 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.transport.MultiVersionProtocolEngine;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.transport.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.server.transport.AcceptingTransport;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.server.transport.MultiVersionProtocolEngine;
+import org.apache.qpid.server.transport.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.transport.SchedulingDelayNotificationListener;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
+import org.apache.qpid.server.transport.network.Ticker;
+import org.apache.qpid.server.transport.network.security.ssl.SSLUtil;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.transport.ByteBufferSender;
-import org.apache.qpid.server.transport.network.security.ssl.SSLUtil;
 
 class WebSocketProvider implements AcceptingTransport
 {
@@ -82,6 +84,11 @@ class WebSocketProvider implements Accep
     private final MultiVersionProtocolEngineFactory _factory;
     private Server _server;
 
+    private final List<ConnectionWrapper> _activeConnections = new CopyOnWriteArrayList<>();
+
+    private final WebSocketIdleTimeoutChecker _idleTimeoutChecker = new WebSocketIdleTimeoutChecker();
+    private final AtomicBoolean _closed = new AtomicBoolean();
+
     WebSocketProvider(final Transport transport,
                       final SSLContext sslContext,
                       final AmqpPort<?> port,
@@ -101,11 +108,14 @@ class WebSocketProvider implements Accep
                         _port,
                         _transport);
 
+
     }
 
     @Override
     public void start()
     {
+        _idleTimeoutChecker.start();
+
         _server = new Server();
 
         final AbstractConnector connector;
@@ -238,7 +248,8 @@ class WebSocketProvider implements Accep
     @Override
     public void close()
     {
-
+        _closed.set(true);
+        _idleTimeoutChecker.wakeup();
     }
 
     @Override
@@ -291,13 +302,14 @@ class WebSocketProvider implements Accep
                     buffer.dispose();
 
                     _connectionWrapper.doWrite();
-
                 }
                 finally
                 {
                     _protocolEngine.setIOThread(null);
                 }
             }
+            _idleTimeoutChecker.wakeup();
+
         }
 
         @Override
@@ -308,8 +320,11 @@ class WebSocketProvider implements Accep
 
             connection.setMaxBinaryMessageSize(0);
 
+            // Let AMQP do timeout handling
+            connection.setMaxIdleTime(0);
+
             _connectionWrapper =
-                    new ConnectionWrapper(connection, _localAddress, _remoteAddress, _protocolEngine);
+                    new ConnectionWrapper(connection, _localAddress, _remoteAddress, _protocolEngine, _threadPool);
             _connectionWrapper.setPeerCertificate(_userCertificate);
             _protocolEngine.setNetworkConnection(_connectionWrapper);
             _protocolEngine.setWorkListener(new Action<ProtocolEngine>()
@@ -327,14 +342,16 @@ class WebSocketProvider implements Accep
                     });
                 }
             });
-
-
+            _activeConnections.add(_connectionWrapper);
+            _idleTimeoutChecker.wakeup();
         }
 
         @Override
         public void onClose(final int closeCode, final String message)
         {
             _protocolEngine.closed();
+            _activeConnections.remove(_connectionWrapper);
+            _idleTimeoutChecker.wakeup();
         }
     }
 
@@ -345,7 +362,8 @@ class WebSocketProvider implements Accep
         private final SocketAddress _remoteAddress;
         private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
         private final MultiVersionProtocolEngine _protocolEngine;
-        private final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
+        private final ThreadPool _threadPool;
+        private final Runnable _tickJob;
 
         private Certificate _certificate;
         private long _maxWriteIdleMillis;
@@ -353,12 +371,27 @@ class WebSocketProvider implements Accep
 
         public ConnectionWrapper(final WebSocket.Connection connection,
                                  final SocketAddress localAddress,
-                                 final SocketAddress remoteAddress, final MultiVersionProtocolEngine protocolEngine)
+                                 final SocketAddress remoteAddress,
+                                 final MultiVersionProtocolEngine protocolEngine,
+                                 final ThreadPool threadPool)
         {
             _connection = connection;
             _localAddress = localAddress;
             _remoteAddress = remoteAddress;
             _protocolEngine = protocolEngine;
+            _threadPool = threadPool;
+            _tickJob = new Runnable()
+                        {
+                            @Override
+                            public void run()
+                            {
+                                synchronized (ConnectionWrapper.this)
+                                {
+                                    protocolEngine.getAggregateTicker().tick(System.currentTimeMillis());
+                                    doWrite();
+                                }
+                            }
+                        };
         }
 
         @Override
@@ -502,7 +535,6 @@ class WebSocketProvider implements Accep
                 try
                 {
                     _connection.sendMessage(data, 0, size);
-                    _usedOutboundMessageSpace.set(0);
                 }
                 catch (IOException e)
                 {
@@ -526,7 +558,7 @@ class WebSocketProvider implements Accep
                 }
 
                 doWrite();
-
+                _idleTimeoutChecker.wakeup();
             }
             finally
             {
@@ -534,5 +566,73 @@ class WebSocketProvider implements Accep
             }
 
         }
+
+
+        public void tick()
+        {
+            _threadPool.dispatch(_tickJob);
+        }
+    }
+
+
+
+    private class WebSocketIdleTimeoutChecker extends Thread
+    {
+
+        public WebSocketIdleTimeoutChecker()
+        {
+            setName("WebSocket Idle Checker: " + _port);
+        }
+
+        @Override
+        public void run()
+        {
+            while(!_closed.get())
+            {
+                ConnectionWrapper connectionToTick = null;
+                long currentTime = System.currentTimeMillis();
+                synchronized (this)
+                {
+                    long nextTick = Long.MAX_VALUE;
+                    for(ConnectionWrapper connection : _activeConnections)
+                    {
+                        ProtocolEngine engine = connection._protocolEngine;
+                        final Ticker ticker = engine.getAggregateTicker();
+                        long tick = ticker.getTimeToNextTick(currentTime);
+                        if(tick <= 0)
+                        {
+                            connectionToTick = connection;
+                            nextTick = -1;
+                            break;
+                        }
+                        else if(tick < nextTick)
+                        {
+                            nextTick = tick;
+                        }
+                    }
+                    if(nextTick > 0)
+                    {
+                        try
+                        {
+                            wait(nextTick);
+                        }
+                        catch (InterruptedException e)
+                        {
+                            Thread.currentThread().interrupt();
+                            break;
+                        }
+                    }
+                }
+                if(connectionToTick != null)
+                {
+                    connectionToTick.tick();
+                }
+            }
+        }
+
+        private synchronized void wakeup()
+        {
+            notifyAll();
+        }
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org