You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/01/29 12:28:32 UTC

svn commit: r1655596 - in /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io: NonBlockingSenderReceiver.java SelectorThread.java

Author: rgodfrey
Date: Thu Jan 29 11:28:32 2015
New Revision: 1655596

URL: http://svn.apache.org/r1655596
Log:
Ensure selectors are closed, and all work which can be done on a read is done

Modified:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1655596&r1=1655595&r2=1655596&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java Thu Jan 29 11:28:32 2015
@@ -21,8 +21,6 @@ package org.apache.qpid.transport.networ
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 import java.security.Principal;
 import java.util.ArrayList;
@@ -42,7 +40,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.SenderClosedException;
 import org.apache.qpid.transport.SenderException;
@@ -78,6 +75,7 @@ public class NonBlockingSenderReceiver
     private SSLEngineResult _status;
     private volatile boolean _fullyWritten = true;
     private AtomicBoolean _stateChanged = new AtomicBoolean();
+    private boolean _workDone;
 
 
     public NonBlockingSenderReceiver(final NonBlockingConnection connection,
@@ -152,12 +150,13 @@ public class NonBlockingSenderReceiver
     public boolean doWork()
     {
         _stateChanged.set(false);
-
         boolean closed = _closed.get();
         if (!closed)
         {
             try
             {
+                _workDone = false;
+
                 long currentTime = System.currentTimeMillis();
                 int tick = _ticker.getTimeToNextTick(currentTime);
                 if (tick <= 0)
@@ -170,6 +169,10 @@ public class NonBlockingSenderReceiver
                 _fullyWritten = doWrite();
                 _receiver.setTransportBlockedForWriting(!_fullyWritten);
 
+                if(_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0)
+                {
+                    _stateChanged.set(true);
+                }
             }
             catch (IOException e)
             {
@@ -267,7 +270,6 @@ public class NonBlockingSenderReceiver
     public void close()
     {
         LOGGER.debug("Closing " + _remoteSocketAddress);
-
         _closed.set(true);
         _stateChanged.set(true);
         _connection.getSelector().wakeup();
@@ -310,11 +312,11 @@ public class NonBlockingSenderReceiver
         else if(_transportEncryption == TransportEncryption.TLS)
         {
             int remaining = 0;
-
             do
             {
                 if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
                 {
+                    _workDone = true;
                     final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
                     _status = _sslEngine.wrap(bufArray, netBuffer);
                     runSSLEngineTasks(_status);
@@ -337,14 +339,12 @@ public class NonBlockingSenderReceiver
 
             }
             while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
-
             ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]);
             long written  = _socketChannel.write(encryptedBuffers);
             if (LOGGER.isDebugEnabled())
             {
                 LOGGER.debug("Written " + written + " encrypted bytes");
             }
-
             ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
             while(iter.hasNext())
             {
@@ -409,25 +409,27 @@ public class NonBlockingSenderReceiver
 
                 if (LOGGER.isDebugEnabled())
                 {
-                    LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer);
+                    LOGGER.debug("Read " + read + " encrypted bytes ");
                 }
+
                 _netInputBuffer.flip();
 
 
                 int unwrapped = 0;
+                boolean tasksRun;
                 do
                 {
                     ByteBuffer appInputBuffer =
                             ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
 
                     _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer);
-                    runSSLEngineTasks(_status);
+                    tasksRun = runSSLEngineTasks(_status);
 
                     appInputBuffer.flip();
                     unwrapped = appInputBuffer.remaining();
                     _receiver.received(appInputBuffer);
                 }
-                while(unwrapped > 0);
+                while(unwrapped > 0 || tasksRun);
 
                 _netInputBuffer.compact();
 
@@ -476,7 +478,7 @@ public class NonBlockingSenderReceiver
         }
     }
 
-    private void runSSLEngineTasks(final SSLEngineResult status)
+    private boolean runSSLEngineTasks(final SSLEngineResult status)
     {
         if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
         {
@@ -485,7 +487,9 @@ public class NonBlockingSenderReceiver
             {
                 task.run();
             }
+            return true;
         }
+        return false;
     }
 
     private boolean looksLikeSSL(byte[] headerBytes)

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java?rev=1655596&r1=1655595&r2=1655596&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java Thu Jan 29 11:28:32 2015
@@ -39,7 +39,7 @@ public class SelectorThread extends Thre
 
     private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
     private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
-    private final Selector _selector;
+    private Selector _selector;
     private final AtomicBoolean _closed = new AtomicBoolean();
     private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler();
 
@@ -62,86 +62,80 @@ public class SelectorThread extends Thre
     {
 
         long nextTimeout = 0;
-        while(!_closed.get())
-        {
-
 
-            try
+        try
+        {
+            try (Selector selector = Selector.open())
             {
+                _selector = selector;
+                while (!_closed.get())
+                {
 
-                _selector.select(nextTimeout);
-
-                List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
-
+                    _selector.select(nextTimeout);
 
+                    List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
 
-                Set<SelectionKey> selectionKeys = _selector.selectedKeys();
-                for (SelectionKey key : selectionKeys)
-                {
-                    NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
 
-                    key.channel().register(_selector, 0);
+                    Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+                    for (SelectionKey key : selectionKeys)
+                    {
+                        NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
 
-                    toBeScheduled.add(connection);
-                    _unscheduledConnections.remove(connection);
+                        key.channel().register(_selector, 0);
 
-                }
-                selectionKeys.clear();
+                        toBeScheduled.add(connection);
+                        _unscheduledConnections.remove(connection);
 
-                while(_unregisteredConnections.peek() != null)
-                {
-                    NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
-                    _unscheduledConnections.add(unregisteredConnection);
+                    }
+                    selectionKeys.clear();
 
+                    while (_unregisteredConnections.peek() != null)
+                    {
+                        NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
+                        _unscheduledConnections.add(unregisteredConnection);
 
-                    final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
-                    unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
 
-                }
+                        final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
+                                        | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
+                        unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
 
-                long currentTime = System.currentTimeMillis();
-                Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
-                nextTimeout = Integer.MAX_VALUE;
-                while(iterator.hasNext())
-                {
-                    NonBlockingConnection connection = iterator.next();
+                    }
 
-                    int period = connection.getTicker().getTimeToNextTick(currentTime);
-                    if (period < 0 || connection.isStateChanged())
+                    long currentTime = System.currentTimeMillis();
+                    Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
+                    nextTimeout = Integer.MAX_VALUE;
+                    while (iterator.hasNext())
                     {
-                        toBeScheduled.add(connection);
-                        iterator.remove();
+                        NonBlockingConnection connection = iterator.next();
+
+                        int period = connection.getTicker().getTimeToNextTick(currentTime);
+                        if (period < 0 || connection.isStateChanged())
+                        {
+                            toBeScheduled.add(connection);
+                            iterator.remove();
+                        }
+                        else
+                        {
+                            nextTimeout = Math.min(period, nextTimeout);
+                        }
                     }
-                    else
+
+                    for (NonBlockingConnection connection : toBeScheduled)
                     {
-                        nextTimeout = Math.min(period, nextTimeout);
+                        _scheduler.schedule(connection);
                     }
-                }
 
-                for(NonBlockingConnection connection : toBeScheduled)
-                {
-                    _scheduler.schedule(connection);
                 }
-
             }
-            catch (IOException e)
-            {
-                // Close ourselves?  Inform accepting thread??
-                e.printStackTrace();
-            }
-
-        }
-        try
-        {
-            _selector.close();
         }
         catch (IOException e)
         {
-            // TODO
+            //TODO
             e.printStackTrace();
         }
 
 
+
     }
 
     public void addConnection(final NonBlockingConnection connection)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org