You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/02/14 12:38:46 UTC

svn commit: r1782953 - in /qpid/java/branches/6.1.x: ./ broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/

Author: orudyy
Date: Tue Feb 14 12:38:46 2017
New Revision: 1782953

URL: http://svn.apache.org/viewvc?rev=1782953&view=rev
Log:
QPID-7670 : WebSocket transport does not respect AMQP idle timeout

merged from trunk using
svn merge -c 1782735 ^/qpid/java/trunk
Merge conflicts are resolved manually

Modified:
    qpid/java/branches/6.1.x/   (props changed)
    qpid/java/branches/6.1.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    qpid/java/branches/6.1.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java

Propchange: qpid/java/branches/6.1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 14 12:38:46 2017
@@ -9,5 +9,5 @@
 /qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1766544,1766547,1766553,1766666,1766796-1766797,1766806,1767251,1767267-1767268,1767275,1767310,1767326,1767329,1767332,1767514,1767523,1767738,1767825,1767847-1767849,1767882,1767909,1767914,1768016-1768017,1768065,1768643,1768704,1768854,1768875,1768914,1768963,1768967,1768976,1769007,1769009,1769087,1769138-1769139,1769597,1769879,1770236,1770716,1772241,1772365,1772574,1773057,1774039,1774446,1774564,1774885,1775087,1775100,1777939
+/qpid/java/trunk:1766544,1766547,1766553,1766666,1766796-1766797,1766806,1767251,1767267-1767268,1767275,1767310,1767326,1767329,1767332,1767514,1767523,1767738,1767825,1767847-1767849,1767882,1767909,1767914,1768016-1768017,1768065,1768643,1768704,1768854,1768875,1768914,1768963,1768967,1768976,1769007,1769009,1769087,1769138-1769139,1769597,1769879,1770236,1770716,1772241,1772365,1772574,1773057,1774039,1774446,1774564,1774885,1775087,1775100,1777939,1782735
 /qpid/trunk/qpid:796646-796653

Modified: qpid/java/branches/6.1.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1782953&r1=1782952&r2=1782953&view=diff
==============================================================================
--- qpid/java/branches/6.1.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/branches/6.1.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Tue Feb 14 12:38:46 2017
@@ -1304,7 +1304,7 @@ public class AMQPConnection_1_0 extends
         FRAME_LOGGER.debug("SEND[{}|{}] : {}",
                            getNetwork().getRemoteAddress(),
                            amqFrame.getChannel(),
-                           amqFrame.getFrameBody());
+                           amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : amqFrame.getFrameBody());
 
         int size = _frameWriter.send(amqFrame);
         if (size > getMaxFrameSize())

Modified: qpid/java/branches/6.1.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1782953&r1=1782952&r2=1782953&view=diff
==============================================================================
--- qpid/java/branches/6.1.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/branches/6.1.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Tue Feb 14 12:38:46 2017
@@ -32,6 +32,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.net.ssl.SSLContext;
@@ -54,13 +56,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.transport.MultiVersionProtocolEngine;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.transport.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.server.transport.AcceptingTransport;
+import org.apache.qpid.server.transport.MultiVersionProtocolEngine;
+import org.apache.qpid.server.transport.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.transport.SchedulingDelayNotificationListener;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
@@ -68,6 +70,7 @@ import org.apache.qpid.server.util.Actio
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+import org.apache.qpid.transport.network.Ticker;
 
 class WebSocketProvider implements AcceptingTransport
 {
@@ -83,6 +86,11 @@ class WebSocketProvider implements Accep
     private Server _server;
     private final long _outboundMessageBufferLimit;
 
+    private final List<ConnectionWrapper> _activeConnections = new CopyOnWriteArrayList<>();
+
+    private final WebSocketIdleTimeoutChecker _idleTimeoutChecker = new WebSocketIdleTimeoutChecker();
+    private final AtomicBoolean _closed = new AtomicBoolean();
+
     WebSocketProvider(final Transport transport,
                       final SSLContext sslContext,
                       final AmqpPort<?> port,
@@ -104,11 +112,14 @@ class WebSocketProvider implements Accep
                         _port,
                         _transport);
 
+
     }
 
     @Override
     public void start()
     {
+        _idleTimeoutChecker.start();
+
         _server = new Server();
 
         final AbstractConnector connector;
@@ -241,7 +252,8 @@ class WebSocketProvider implements Accep
     @Override
     public void close()
     {
-
+        _closed.set(true);
+        _idleTimeoutChecker.wakeup();
     }
 
     @Override
@@ -295,7 +307,6 @@ class WebSocketProvider implements Accep
                     buffer.dispose();
 
                     _connectionWrapper.doWrite();
-
                     _protocolEngine.setMessageAssignmentSuspended(false, true);
                 }
                 finally
@@ -303,6 +314,8 @@ class WebSocketProvider implements Accep
                     _protocolEngine.setIOThread(null);
                 }
             }
+            _idleTimeoutChecker.wakeup();
+
         }
 
         @Override
@@ -313,8 +326,11 @@ class WebSocketProvider implements Accep
 
             connection.setMaxBinaryMessageSize(0);
 
+            // Let AMQP do timeout handling
+            connection.setMaxIdleTime(0);
+
             _connectionWrapper =
-                    new ConnectionWrapper(connection, _localAddress, _remoteAddress, _protocolEngine);
+                    new ConnectionWrapper(connection, _localAddress, _remoteAddress, _protocolEngine, _threadPool);
             _connectionWrapper.setPeerCertificate(_userCertificate);
             _protocolEngine.setNetworkConnection(_connectionWrapper);
             _protocolEngine.setWorkListener(new Action<ProtocolEngine>()
@@ -332,14 +348,16 @@ class WebSocketProvider implements Accep
                     });
                 }
             });
-
-
+            _activeConnections.add(_connectionWrapper);
+            _idleTimeoutChecker.wakeup();
         }
 
         @Override
         public void onClose(final int closeCode, final String message)
         {
             _protocolEngine.closed();
+            _activeConnections.remove(_connectionWrapper);
+            _idleTimeoutChecker.wakeup();
         }
     }
 
@@ -351,6 +369,8 @@ class WebSocketProvider implements Accep
         private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
         private final MultiVersionProtocolEngine _protocolEngine;
         private final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
+        private final ThreadPool _threadPool;
+        private final Runnable _tickJob;
 
         private Certificate _certificate;
         private long _maxWriteIdleMillis;
@@ -358,12 +378,27 @@ class WebSocketProvider implements Accep
 
         public ConnectionWrapper(final WebSocket.Connection connection,
                                  final SocketAddress localAddress,
-                                 final SocketAddress remoteAddress, final MultiVersionProtocolEngine protocolEngine)
+                                 final SocketAddress remoteAddress,
+                                 final MultiVersionProtocolEngine protocolEngine,
+                                 final ThreadPool threadPool)
         {
             _connection = connection;
             _localAddress = localAddress;
             _remoteAddress = remoteAddress;
             _protocolEngine = protocolEngine;
+            _threadPool = threadPool;
+            _tickJob = new Runnable()
+                        {
+                            @Override
+                            public void run()
+                            {
+                                synchronized (ConnectionWrapper.this)
+                                {
+                                    protocolEngine.getAggregateTicker().tick(System.currentTimeMillis());
+                                    doWrite();
+                                }
+                            }
+                        };
         }
 
         @Override
@@ -516,7 +551,6 @@ class WebSocketProvider implements Accep
                 try
                 {
                     _connection.sendMessage(data, 0, size);
-                    _usedOutboundMessageSpace.set(0);
                 }
                 catch (IOException e)
                 {
@@ -541,7 +575,7 @@ class WebSocketProvider implements Accep
                 }
 
                 doWrite();
-
+                _idleTimeoutChecker.wakeup();
                 _protocolEngine.setMessageAssignmentSuspended(false, true);
             }
             finally
@@ -550,5 +584,73 @@ class WebSocketProvider implements Accep
             }
 
         }
+
+
+        public void tick()
+        {
+            _threadPool.dispatch(_tickJob);
+        }
+    }
+
+
+
+    private class WebSocketIdleTimeoutChecker extends Thread
+    {
+
+        public WebSocketIdleTimeoutChecker()
+        {
+            setName("WebSocket Idle Checker: " + _port);
+        }
+
+        @Override
+        public void run()
+        {
+            while(!_closed.get())
+            {
+                ConnectionWrapper connectionToTick = null;
+                long currentTime = System.currentTimeMillis();
+                synchronized (this)
+                {
+                    long nextTick = Long.MAX_VALUE;
+                    for(ConnectionWrapper connection : _activeConnections)
+                    {
+                        ProtocolEngine engine = connection._protocolEngine;
+                        final Ticker ticker = engine.getAggregateTicker();
+                        long tick = ticker.getTimeToNextTick(currentTime);
+                        if(tick <= 0)
+                        {
+                            connectionToTick = connection;
+                            nextTick = -1;
+                            break;
+                        }
+                        else if(tick < nextTick)
+                        {
+                            nextTick = tick;
+                        }
+                    }
+                    if(nextTick > 0)
+                    {
+                        try
+                        {
+                            wait(nextTick);
+                        }
+                        catch (InterruptedException e)
+                        {
+                            Thread.currentThread().interrupt();
+                            break;
+                        }
+                    }
+                }
+                if(connectionToTick != null)
+                {
+                    connectionToTick.tick();
+                }
+            }
+        }
+
+        private synchronized void wakeup()
+        {
+            notifyAll();
+        }
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org