You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/10/23 17:48:42 UTC

svn commit: r1710248 - /qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java

Author: rgodfrey
Date: Fri Oct 23 15:48:42 2015
New Revision: 1710248

URL: http://svn.apache.org/viewvc?rev=1710248&view=rev
Log:
QPID-6810 : Update WebSockets plugin to align with new IO model

Modified:
    qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java

Modified: qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1710248&r1=1710247&r2=1710248&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Fri Oct 23 15:48:42 2015
@@ -23,18 +23,21 @@ package org.apache.qpid.server.transport
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
 import java.security.Principal;
 import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import javax.net.ssl.SSLContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.eclipse.jetty.server.AbstractConnector;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.Server;
@@ -42,6 +45,7 @@ import org.eclipse.jetty.server.handler.
 import org.eclipse.jetty.server.nio.SelectChannelConnector;
 import org.eclipse.jetty.server.ssl.SslSelectChannelConnector;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.ThreadPool;
 import org.eclipse.jetty.websocket.WebSocket;
 import org.eclipse.jetty.websocket.WebSocketHandler;
 
@@ -53,6 +57,8 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.transport.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.server.transport.AcceptingTransport;
+import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.NetworkConnection;
@@ -95,7 +101,7 @@ class WebSocketProvider implements Accep
     {
         _server = new Server();
 
-        Connector connector = null;
+        final AbstractConnector connector;
 
 
         if (_transport == Transport.WS)
@@ -149,7 +155,7 @@ class WebSocketProvider implements Accep
 
                 SocketAddress remoteAddress = new InetSocketAddress(request.getRemoteHost(), request.getRemotePort());
                 SocketAddress localAddress = new InetSocketAddress(request.getLocalName(), request.getLocalPort());
-                return AMQP_WEBSOCKET_SUBPROTOCOL.equals(protocol) ? new AmqpWebSocket(_transport, localAddress, remoteAddress, certificate) : null;
+                return new AmqpWebSocket(_transport, localAddress, remoteAddress, certificate, connector.getThreadPool());
             }
         };
 
@@ -206,45 +212,78 @@ class WebSocketProvider implements Accep
         private final SocketAddress _localAddress;
         private final SocketAddress _remoteAddress;
         private final Certificate _userCertificate;
-        private Connection _connection;
-        private final Transport _transport;
-        private MultiVersionProtocolEngine _engine;
+        private final ThreadPool _threadPool;
+        private volatile MultiVersionProtocolEngine _protocolEngine;
+        private volatile ConnectionWrapper _connectionWrapper;
 
         private AmqpWebSocket(final Transport transport,
                               final SocketAddress localAddress,
                               final SocketAddress remoteAddress,
-                              final Certificate userCertificate)
+                              final Certificate userCertificate,
+                              final ThreadPool threadPool)
         {
-            _transport = transport;
             _localAddress = localAddress;
             _remoteAddress = remoteAddress;
             _userCertificate = userCertificate;
+            _threadPool = threadPool;
         }
 
         @Override
         public void onMessage(final byte[] data, final int offset, final int length)
         {
-            _engine.received(QpidByteBuffer.wrap(data, offset, length).slice());
+            synchronized (_connectionWrapper)
+            {
+
+                _protocolEngine.clearWork();
+                _protocolEngine.setMessageAssignmentSuspended(true);
+
+                _protocolEngine.processPending();
+
+                QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(length);
+                buffer.put(data,offset,length);
+                buffer.flip();
+                _protocolEngine.received(buffer);
+                buffer.dispose();
+
+                _connectionWrapper.doWrite();
+
+                _protocolEngine.setMessageAssignmentSuspended(false);
+            }
         }
 
         @Override
         public void onOpen(final Connection connection)
         {
-            _connection = connection;
 
-            _engine = _factory.newProtocolEngine(_remoteAddress);
+            _protocolEngine = _factory.newProtocolEngine(_remoteAddress);
+
+            _connectionWrapper =
+                    new ConnectionWrapper(connection, _localAddress, _remoteAddress, _protocolEngine);
+            _connectionWrapper.setPeerCertificate(_userCertificate);
+            _protocolEngine.setNetworkConnection(_connectionWrapper);
+            _protocolEngine.setWorkListener(new Action<ProtocolEngine>()
+            {
+                @Override
+                public void performAction(final ProtocolEngine object)
+                {
+                    _threadPool.dispatch(new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            _connectionWrapper.doWork();
+                        }
+                    });
+                }
+            });
 
-            final ConnectionWrapper connectionWrapper =
-                    new ConnectionWrapper(connection, _localAddress, _remoteAddress);
-            connectionWrapper.setPeerCertificate(_userCertificate);
-            _engine.setNetworkConnection(connectionWrapper);
 
         }
 
         @Override
         public void onClose(final int closeCode, final String message)
         {
-            _engine.closed();
+            _protocolEngine.closed();
         }
     }
 
@@ -253,17 +292,21 @@ class WebSocketProvider implements Accep
         private final WebSocket.Connection _connection;
         private final SocketAddress _localAddress;
         private final SocketAddress _remoteAddress;
+        private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
+        private final MultiVersionProtocolEngine _protocolEngine;
+
         private Certificate _certificate;
         private int _maxWriteIdle;
         private int _maxReadIdle;
 
         public ConnectionWrapper(final WebSocket.Connection connection,
                                  final SocketAddress localAddress,
-                                 final SocketAddress remoteAddress)
+                                 final SocketAddress remoteAddress, final MultiVersionProtocolEngine protocolEngine)
         {
             _connection = connection;
             _localAddress = localAddress;
             _remoteAddress = remoteAddress;
+            _protocolEngine = protocolEngine;
         }
 
         @Override
@@ -278,52 +321,16 @@ class WebSocketProvider implements Accep
 
         }
 
-        private void send(final ByteBuffer msg)
-        {
-            try
-            {
-                if (msg.hasArray())
-                {
-                    _connection.sendMessage(msg.array(), msg.arrayOffset() + msg.position(), msg.remaining());
-                }
-                else
-                {
-                    byte[] copy = new byte[msg.remaining()];
-                    msg.duplicate().get(copy);
-                    _connection.sendMessage(copy, 0, copy.length);
-                }
-            }
-            catch (IOException e)
-            {
-                close();
-            }
-        }
-
         @Override
         public void send(final QpidByteBuffer msg)
         {
-            try
+            if (msg.remaining() > 0)
             {
-                if (msg.hasArray())
-                {
-                    _connection.sendMessage(msg.array(), msg.arrayOffset() + msg.position(), msg.remaining());
-                }
-                else
-                {
-                    byte[] copy = new byte[msg.remaining()];
-                    final QpidByteBuffer duplicate = msg.duplicate();
-                    duplicate.get(copy);
-                    duplicate.dispose();
-                    _connection.sendMessage(copy, 0, copy.length);
-                }
-            }
-            catch (IOException e)
-            {
-                close();
+                _buffers.add(msg.duplicate());
             }
+            msg.position(msg.limit());
         }
 
-
         @Override
         public void flush()
         {
@@ -388,5 +395,49 @@ class WebSocketProvider implements Accep
         {
             _certificate = certificate;
         }
+
+        public synchronized void doWrite()
+        {
+            int size = 0;
+            List<QpidByteBuffer> toBeWritten = new ArrayList<>(_buffers.size());
+            QpidByteBuffer buf;
+            while((buf = _buffers.poll())!= null)
+            {
+                size += buf.remaining();
+                toBeWritten.add(buf);
+            }
+
+            byte[] data = new byte[size];
+            int offset = 0;
+
+            for(QpidByteBuffer tmp : toBeWritten)
+            {
+                int remaining = tmp.remaining();
+                tmp.get(data, offset, remaining);
+                tmp.dispose();
+                offset += remaining;
+            }
+            try
+            {
+                _connection.sendMessage(data,0,size);
+            }
+            catch (IOException e)
+            {
+                close();
+            }
+        }
+
+        public synchronized void doWork()
+        {
+            _protocolEngine.clearWork();
+            _protocolEngine.setMessageAssignmentSuspended(true);
+
+            _protocolEngine.processPending();
+
+            doWrite();
+
+            _protocolEngine.setMessageAssignmentSuspended(false);
+
+        }
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org