You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/12/11 14:08:36 UTC

svn commit: r1644625 - in /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java: broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ common/src/main/java/org/apach...

Author: kwall
Date: Thu Dec 11 13:08:36 2014
New Revision: 1644625

URL: http://svn.apache.org/r1644625
Log:
Ensure that the NonBlockingSenderReceiver closes (and thread stops) if the peer closes the connection.  Remove some more dead code.

Removed:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java
Modified:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1644625&r1=1644624&r2=1644625&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Thu Dec 11 13:08:36 2014
@@ -84,11 +84,6 @@ public class MultiVersionProtocolEngine
         _onCloseTask = onCloseTask;
     }
 
-    void setTransport(Transport transport)
-    {
-        _transport = transport;
-    }
-
     public SocketAddress getRemoteAddress()
     {
         return _delegate.getRemoteAddress();

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1644625&r1=1644624&r2=1644625&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Thu Dec 11 13:08:36 2014
@@ -223,11 +223,6 @@ public class ProtocolEngine_0_10  extend
         return getRemoteAddress().toString();
     }
 
-    public String getAuthId()
-    {
-        return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName();
-    }
-
     public boolean isDurable()
     {
         return false;

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java?rev=1644625&r1=1644624&r2=1644625&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java Thu Dec 11 13:08:36 2014
@@ -49,6 +49,7 @@ import org.apache.qpid.transport.network
 import org.apache.qpid.transport.network.TransportEncryption;
 import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 
+// TODO we are no longer using the IncomingNetworkTransport
 public abstract class AbstractNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
 {
     private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java?rev=1644625&r1=1644624&r2=1644625&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java Thu Dec 11 13:08:36 2014
@@ -49,7 +49,7 @@ public class NonBlockingNetworkTransport
     private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
     private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
                                                           CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
-    private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
+    private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
                                                                    CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
     private AcceptingThread _acceptor;
 
@@ -70,24 +70,11 @@ public class NonBlockingNetworkTransport
 
     public void close()
     {
-/*
-        if(_connection != null)
-        {
-            _connection.close();
-        }
-*/
         if(_acceptor != null)
         {
             _acceptor.close();
         }
     }
-/*
-
-    public NetworkConnection getConnection()
-    {
-        return _connection;
-    }
-*/
 
     public void accept(NetworkTransportConfiguration config,
                        ProtocolEngineFactory factory,
@@ -115,10 +102,10 @@ public class NonBlockingNetworkTransport
     {
         private final Set<TransportEncryption> _encryptionSet;
         private volatile boolean _closed = false;
-        private NetworkTransportConfiguration _config;
-        private ProtocolEngineFactory _factory;
-        private SSLContext _sslContext;
-        private ServerSocketChannel _serverSocket;
+        private final NetworkTransportConfiguration _config;
+        private final ProtocolEngineFactory _factory;
+        private final SSLContext _sslContext;
+        private final ServerSocketChannel _serverSocket;
         private int _timeout;
 
         private AcceptingThread(NetworkTransportConfiguration config,
@@ -184,7 +171,7 @@ public class NonBlockingNetworkTransport
                         if(engine != null)
                         {
                             socket.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
-                            socket.socket().setSoTimeout(1000 * HANSHAKE_TIMEOUT);
+                            socket.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
 
                             final Integer sendBufferSize = _config.getSendBufferSize();
                             final Integer receiveBufferSize = _config.getReceiveBufferSize();
@@ -216,12 +203,11 @@ public class NonBlockingNetworkTransport
                                                                 }
                                                             });
 
-                            connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
+                            engine.setNetworkConnection(connection, connection.getSender());
+                            connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
 
                             ticker.setConnection(connection);
 
-                            engine.setNetworkConnection(connection, connection.getSender());
-
                             connection.start();
                         }
                         else

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1644625&r1=1644624&r2=1644625&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java Thu Dec 11 13:08:36 2014
@@ -178,6 +178,7 @@ public class NonBlockingSenderReceiver
         //  read as much as you can
         //  try to write all pending byte buffers
 
+
         while (!_closed.get())
         {
 
@@ -206,6 +207,7 @@ public class NonBlockingSenderReceiver
                                         fullyWritten
                                                 ? SelectionKey.OP_READ
                                                 : (SelectionKey.OP_WRITE | SelectionKey.OP_READ));
+
             }
             catch (IOException e)
             {
@@ -359,7 +361,11 @@ public class NonBlockingSenderReceiver
                 {
                     _currentBuffer = ByteBuffer.allocate(_receiveBufSize);
                 }
-                _socketChannel.read(_currentBuffer);
+                int read = _socketChannel.read(_currentBuffer);
+                if (read == -1)
+                {
+                    _closed.set(true);
+                }
                 remaining = _currentBuffer.remaining();
                 if (LOGGER.isDebugEnabled())
                 {
@@ -378,6 +384,11 @@ public class NonBlockingSenderReceiver
             while(!_closed.get() && (read > 0 || unwrapped > 0) && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
             {
                 read = _socketChannel.read(_netInputBuffer);
+                if (read == -1)
+                {
+                    _closed.set(true);
+                }
+
                 LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer);
                 _netInputBuffer.flip();
                 ByteBuffer appInputBuffer =
@@ -403,6 +414,11 @@ public class NonBlockingSenderReceiver
             {
 
                 read = _socketChannel.read(_netInputBuffer);
+                if (read == -1)
+                {
+                    _closed.set(true);
+                }
+
                 LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer);
 
                 if (_netInputBuffer.position() >= 6)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org