You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/12/11 14:08:36 UTC
svn commit: r1644625 - in /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/protocol/
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
common/src/main/java/org/apach...
Author: kwall
Date: Thu Dec 11 13:08:36 2014
New Revision: 1644625
URL: http://svn.apache.org/r1644625
Log:
Ensure that the NonBlockingSenderReceiver closes (and thread stops) if the peer closes the connection. Remove some more dead code.
Removed:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1644625&r1=1644624&r2=1644625&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Thu Dec 11 13:08:36 2014
@@ -84,11 +84,6 @@ public class MultiVersionProtocolEngine
_onCloseTask = onCloseTask;
}
- void setTransport(Transport transport)
- {
- _transport = transport;
- }
-
public SocketAddress getRemoteAddress()
{
return _delegate.getRemoteAddress();
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1644625&r1=1644624&r2=1644625&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Thu Dec 11 13:08:36 2014
@@ -223,11 +223,6 @@ public class ProtocolEngine_0_10 extend
return getRemoteAddress().toString();
}
- public String getAuthId()
- {
- return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName();
- }
-
public boolean isDurable()
{
return false;
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java?rev=1644625&r1=1644624&r2=1644625&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java Thu Dec 11 13:08:36 2014
@@ -49,6 +49,7 @@ import org.apache.qpid.transport.network
import org.apache.qpid.transport.network.TransportEncryption;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+// TODO we are no longer using the IncomingNetworkTransport
public abstract class AbstractNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
{
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java?rev=1644625&r1=1644624&r2=1644625&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java Thu Dec 11 13:08:36 2014
@@ -49,7 +49,7 @@ public class NonBlockingNetworkTransport
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
- private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
+ private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
private AcceptingThread _acceptor;
@@ -70,24 +70,11 @@ public class NonBlockingNetworkTransport
public void close()
{
-/*
- if(_connection != null)
- {
- _connection.close();
- }
-*/
if(_acceptor != null)
{
_acceptor.close();
}
}
-/*
-
- public NetworkConnection getConnection()
- {
- return _connection;
- }
-*/
public void accept(NetworkTransportConfiguration config,
ProtocolEngineFactory factory,
@@ -115,10 +102,10 @@ public class NonBlockingNetworkTransport
{
private final Set<TransportEncryption> _encryptionSet;
private volatile boolean _closed = false;
- private NetworkTransportConfiguration _config;
- private ProtocolEngineFactory _factory;
- private SSLContext _sslContext;
- private ServerSocketChannel _serverSocket;
+ private final NetworkTransportConfiguration _config;
+ private final ProtocolEngineFactory _factory;
+ private final SSLContext _sslContext;
+ private final ServerSocketChannel _serverSocket;
private int _timeout;
private AcceptingThread(NetworkTransportConfiguration config,
@@ -184,7 +171,7 @@ public class NonBlockingNetworkTransport
if(engine != null)
{
socket.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
- socket.socket().setSoTimeout(1000 * HANSHAKE_TIMEOUT);
+ socket.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
final Integer sendBufferSize = _config.getSendBufferSize();
final Integer receiveBufferSize = _config.getReceiveBufferSize();
@@ -216,12 +203,11 @@ public class NonBlockingNetworkTransport
}
});
- connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
+ engine.setNetworkConnection(connection, connection.getSender());
+ connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
ticker.setConnection(connection);
- engine.setNetworkConnection(connection, connection.getSender());
-
connection.start();
}
else
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1644625&r1=1644624&r2=1644625&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java Thu Dec 11 13:08:36 2014
@@ -178,6 +178,7 @@ public class NonBlockingSenderReceiver
// read as much as you can
// try to write all pending byte buffers
+
while (!_closed.get())
{
@@ -206,6 +207,7 @@ public class NonBlockingSenderReceiver
fullyWritten
? SelectionKey.OP_READ
: (SelectionKey.OP_WRITE | SelectionKey.OP_READ));
+
}
catch (IOException e)
{
@@ -359,7 +361,11 @@ public class NonBlockingSenderReceiver
{
_currentBuffer = ByteBuffer.allocate(_receiveBufSize);
}
- _socketChannel.read(_currentBuffer);
+ int read = _socketChannel.read(_currentBuffer);
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
remaining = _currentBuffer.remaining();
if (LOGGER.isDebugEnabled())
{
@@ -378,6 +384,11 @@ public class NonBlockingSenderReceiver
while(!_closed.get() && (read > 0 || unwrapped > 0) && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
{
read = _socketChannel.read(_netInputBuffer);
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
+
LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer);
_netInputBuffer.flip();
ByteBuffer appInputBuffer =
@@ -403,6 +414,11 @@ public class NonBlockingSenderReceiver
{
read = _socketChannel.read(_netInputBuffer);
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
+
LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer);
if (_netInputBuffer.position() >= 6)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org