You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/10/23 17:48:42 UTC
svn commit: r1710248 -
/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Author: rgodfrey
Date: Fri Oct 23 15:48:42 2015
New Revision: 1710248
URL: http://svn.apache.org/viewvc?rev=1710248&view=rev
Log:
QPID-6810 : Update WebSockets plugin to align with new IO model
Modified:
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Modified: qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1710248&r1=1710247&r2=1710248&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Fri Oct 23 15:48:42 2015
@@ -23,18 +23,21 @@ package org.apache.qpid.server.transport
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.security.Principal;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import javax.net.ssl.SSLContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.eclipse.jetty.server.AbstractConnector;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
@@ -42,6 +45,7 @@ import org.eclipse.jetty.server.handler.
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.server.ssl.SslSelectChannelConnector;
import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketHandler;
@@ -53,6 +57,8 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.transport.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.transport.AcceptingTransport;
+import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -95,7 +101,7 @@ class WebSocketProvider implements Accep
{
_server = new Server();
- Connector connector = null;
+ final AbstractConnector connector;
if (_transport == Transport.WS)
@@ -149,7 +155,7 @@ class WebSocketProvider implements Accep
SocketAddress remoteAddress = new InetSocketAddress(request.getRemoteHost(), request.getRemotePort());
SocketAddress localAddress = new InetSocketAddress(request.getLocalName(), request.getLocalPort());
- return AMQP_WEBSOCKET_SUBPROTOCOL.equals(protocol) ? new AmqpWebSocket(_transport, localAddress, remoteAddress, certificate) : null;
+ return new AmqpWebSocket(_transport, localAddress, remoteAddress, certificate, connector.getThreadPool());
}
};
@@ -206,45 +212,78 @@ class WebSocketProvider implements Accep
private final SocketAddress _localAddress;
private final SocketAddress _remoteAddress;
private final Certificate _userCertificate;
- private Connection _connection;
- private final Transport _transport;
- private MultiVersionProtocolEngine _engine;
+ private final ThreadPool _threadPool;
+ private volatile MultiVersionProtocolEngine _protocolEngine;
+ private volatile ConnectionWrapper _connectionWrapper;
private AmqpWebSocket(final Transport transport,
final SocketAddress localAddress,
final SocketAddress remoteAddress,
- final Certificate userCertificate)
+ final Certificate userCertificate,
+ final ThreadPool threadPool)
{
- _transport = transport;
_localAddress = localAddress;
_remoteAddress = remoteAddress;
_userCertificate = userCertificate;
+ _threadPool = threadPool;
}
@Override
public void onMessage(final byte[] data, final int offset, final int length)
{
- _engine.received(QpidByteBuffer.wrap(data, offset, length).slice());
+ synchronized (_connectionWrapper)
+ {
+
+ _protocolEngine.clearWork();
+ _protocolEngine.setMessageAssignmentSuspended(true);
+
+ _protocolEngine.processPending();
+
+ QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(length);
+ buffer.put(data,offset,length);
+ buffer.flip();
+ _protocolEngine.received(buffer);
+ buffer.dispose();
+
+ _connectionWrapper.doWrite();
+
+ _protocolEngine.setMessageAssignmentSuspended(false);
+ }
}
@Override
public void onOpen(final Connection connection)
{
- _connection = connection;
- _engine = _factory.newProtocolEngine(_remoteAddress);
+ _protocolEngine = _factory.newProtocolEngine(_remoteAddress);
+
+ _connectionWrapper =
+ new ConnectionWrapper(connection, _localAddress, _remoteAddress, _protocolEngine);
+ _connectionWrapper.setPeerCertificate(_userCertificate);
+ _protocolEngine.setNetworkConnection(_connectionWrapper);
+ _protocolEngine.setWorkListener(new Action<ProtocolEngine>()
+ {
+ @Override
+ public void performAction(final ProtocolEngine object)
+ {
+ _threadPool.dispatch(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ _connectionWrapper.doWork();
+ }
+ });
+ }
+ });
- final ConnectionWrapper connectionWrapper =
- new ConnectionWrapper(connection, _localAddress, _remoteAddress);
- connectionWrapper.setPeerCertificate(_userCertificate);
- _engine.setNetworkConnection(connectionWrapper);
}
@Override
public void onClose(final int closeCode, final String message)
{
- _engine.closed();
+ _protocolEngine.closed();
}
}
@@ -253,17 +292,21 @@ class WebSocketProvider implements Accep
private final WebSocket.Connection _connection;
private final SocketAddress _localAddress;
private final SocketAddress _remoteAddress;
+ private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
+ private final MultiVersionProtocolEngine _protocolEngine;
+
private Certificate _certificate;
private int _maxWriteIdle;
private int _maxReadIdle;
public ConnectionWrapper(final WebSocket.Connection connection,
final SocketAddress localAddress,
- final SocketAddress remoteAddress)
+ final SocketAddress remoteAddress, final MultiVersionProtocolEngine protocolEngine)
{
_connection = connection;
_localAddress = localAddress;
_remoteAddress = remoteAddress;
+ _protocolEngine = protocolEngine;
}
@Override
@@ -278,52 +321,16 @@ class WebSocketProvider implements Accep
}
- private void send(final ByteBuffer msg)
- {
- try
- {
- if (msg.hasArray())
- {
- _connection.sendMessage(msg.array(), msg.arrayOffset() + msg.position(), msg.remaining());
- }
- else
- {
- byte[] copy = new byte[msg.remaining()];
- msg.duplicate().get(copy);
- _connection.sendMessage(copy, 0, copy.length);
- }
- }
- catch (IOException e)
- {
- close();
- }
- }
-
@Override
public void send(final QpidByteBuffer msg)
{
- try
+ if (msg.remaining() > 0)
{
- if (msg.hasArray())
- {
- _connection.sendMessage(msg.array(), msg.arrayOffset() + msg.position(), msg.remaining());
- }
- else
- {
- byte[] copy = new byte[msg.remaining()];
- final QpidByteBuffer duplicate = msg.duplicate();
- duplicate.get(copy);
- duplicate.dispose();
- _connection.sendMessage(copy, 0, copy.length);
- }
- }
- catch (IOException e)
- {
- close();
+ _buffers.add(msg.duplicate());
}
+ msg.position(msg.limit());
}
-
@Override
public void flush()
{
@@ -388,5 +395,49 @@ class WebSocketProvider implements Accep
{
_certificate = certificate;
}
+
+ public synchronized void doWrite()
+ {
+ int size = 0;
+ List<QpidByteBuffer> toBeWritten = new ArrayList<>(_buffers.size());
+ QpidByteBuffer buf;
+ while((buf = _buffers.poll())!= null)
+ {
+ size += buf.remaining();
+ toBeWritten.add(buf);
+ }
+
+ byte[] data = new byte[size];
+ int offset = 0;
+
+ for(QpidByteBuffer tmp : toBeWritten)
+ {
+ int remaining = tmp.remaining();
+ tmp.get(data, offset, remaining);
+ tmp.dispose();
+ offset += remaining;
+ }
+ try
+ {
+ _connection.sendMessage(data,0,size);
+ }
+ catch (IOException e)
+ {
+ close();
+ }
+ }
+
+ public synchronized void doWork()
+ {
+ _protocolEngine.clearWork();
+ _protocolEngine.setMessageAssignmentSuspended(true);
+
+ _protocolEngine.processPending();
+
+ doWrite();
+
+ _protocolEngine.setMessageAssignmentSuspended(false);
+
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org