You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/11/12 11:24:24 UTC

svn commit: r1714003 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/transport/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpi...

Author: rgodfrey
Date: Thu Nov 12 10:24:24 2015
New Revision: 1714003

URL: http://svn.apache.org/viewvc?rev=1714003&view=rev
Log:
QPID-6840 : interleave calls to process pending work with network writes

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java Thu Nov 12 10:24:24 2015
@@ -24,6 +24,8 @@ package org.apache.qpid.server.transport
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.security.cert.Certificate;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -197,9 +199,9 @@ public class MultiVersionProtocolEngine
     }
 
     @Override
-    public void processPending()
+    public Iterator<Runnable> processPendingIterator()
     {
-        _delegate.processPending();
+        return _delegate.processPendingIterator();
     }
 
     @Override
@@ -249,9 +251,9 @@ public class MultiVersionProtocolEngine
         }
 
         @Override
-        public void processPending()
+        public Iterator<Runnable> processPendingIterator()
         {
-
+            return Collections.emptyIterator();
         }
 
         @Override
@@ -370,9 +372,9 @@ public class MultiVersionProtocolEngine
         }
 
         @Override
-        public void processPending()
+        public Iterator<Runnable> processPendingIterator()
         {
-
+            return Collections.emptyIterator();
         }
 
         @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Thu Nov 12 10:24:24 2015
@@ -26,6 +26,7 @@ import java.nio.channels.SocketChannel;
 import java.security.Principal;
 import java.security.cert.Certificate;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -71,6 +72,7 @@ public class NonBlockingConnection imple
     private volatile boolean _unexpectedByteBufferSizeReported;
     private final String _threadName;
     private volatile SelectorThread.SelectionTask _selectionTask;
+    private Iterator<Runnable> _pendingItertor;
 
     public NonBlockingConnection(SocketChannel socketChannel,
                                  ProtocolEngine protocolEngine,
@@ -211,12 +213,12 @@ public class NonBlockingConnection imple
         }
     }
 
-    public boolean canRead()
+    public boolean wantsRead()
     {
         return _fullyWritten;
     }
 
-    public boolean canWrite()
+    public boolean wantsWrite()
     {
         return !_fullyWritten;
     }
@@ -243,16 +245,33 @@ public class NonBlockingConnection imple
                 _protocolEngine.setIOThread(Thread.currentThread());
                 _protocolEngine.setMessageAssignmentSuspended(true);
 
-                if (!_fullyWritten)
+                if(_pendingItertor == null)
                 {
-                    doWrite();
+                    _pendingItertor = _protocolEngine.processPendingIterator();
                 }
 
-                if (_fullyWritten)
+                while(_pendingItertor.hasNext())
                 {
-                    _protocolEngine.processPending();
+                    long size = getBufferedSize();
+                    if(size >= _port.getNetworkBufferSize())
+                    {
+                        doWrite();
+                        if((size - getBufferedSize()) < (_port.getNetworkBufferSize()/2))
+                        {
+                            break;
+                        }
+                    }
+                    else
+                    {
+                        final Runnable task = _pendingItertor.next();
+                        task.run();
+                    }
+                }
 
-                    _protocolEngine.setTransportBlockedForWriting(!doWrite());
+                if (!_pendingItertor.hasNext())
+                {
+                    _pendingItertor = null;
+                    _protocolEngine.setTransportBlockedForWriting(false);
                     boolean dataRead = doRead();
                     _protocolEngine.setTransportBlockedForWriting(!doWrite());
 
@@ -306,6 +325,16 @@ public class NonBlockingConnection imple
 
     }
 
+    private long getBufferedSize()
+    {
+        long totalSize = 0l;
+        for(QpidByteBuffer buf : _buffers)
+        {
+            totalSize += buf.remaining();
+        }
+        return totalSize;
+    }
+
     private void shutdown()
     {
         try

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java Thu Nov 12 10:24:24 2015
@@ -20,13 +20,12 @@
  */
 package org.apache.qpid.server.transport;
 
-import java.nio.ByteBuffer;
+import java.util.Iterator;
 
 import javax.security.auth.Subject;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.TransportActivity;
 
@@ -37,43 +36,43 @@ import org.apache.qpid.transport.network
 public interface ProtocolEngine extends TransportActivity
 {
 
-   // Called by the NetworkDriver when the socket has been closed for reading
-   void closed();
+    // Called by the NetworkDriver when the socket has been closed for reading
+    void closed();
 
-   // Called when the NetworkEngine has not written data for the specified period of time (will trigger a
-   // heartbeat)
-   @Override
-   void writerIdle();
+    // Called when the NetworkEngine has not written data for the specified period of time (will trigger a
+    // heartbeat)
+    @Override
+    void writerIdle();
 
-   // Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
-   @Override
-   void readerIdle();
+    // Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
+    @Override
+    void readerIdle();
 
-   Subject getSubject();
+    Subject getSubject();
 
-   boolean isTransportBlockedForWriting();
+    boolean isTransportBlockedForWriting();
 
-   void setTransportBlockedForWriting(boolean blocked);
+    void setTransportBlockedForWriting(boolean blocked);
 
-   void setMessageAssignmentSuspended(boolean value);
+    void setMessageAssignmentSuspended(boolean value);
 
-   boolean isMessageAssignmentSuspended();
+    boolean isMessageAssignmentSuspended();
 
-   void processPending();
+    Iterator<Runnable> processPendingIterator();
 
-   boolean hasWork();
+    boolean hasWork();
 
-   void clearWork();
+    void clearWork();
 
-   void notifyWork();
+    void notifyWork();
 
-   void setWorkListener(Action<ProtocolEngine> listener);
+    void setWorkListener(Action<ProtocolEngine> listener);
 
-   AggregateTicker getAggregateTicker();
+    AggregateTicker getAggregateTicker();
 
-   void encryptedTransport();
+    void encryptedTransport();
 
-   void received(QpidByteBuffer msg);
+    void received(QpidByteBuffer msg);
 
-   void setIOThread(Thread ioThread);
+    void setIOThread(Thread ioThread);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Thu Nov 12 10:24:24 2015
@@ -248,8 +248,8 @@ class SelectorThread extends Thread
                 getUnscheduledConnections().add(unregisteredConnection);
 
 
-                final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
-                                | (unregisteredConnection.canWrite() ? SelectionKey.OP_WRITE : 0);
+                final int ops = (unregisteredConnection.wantsRead() ? SelectionKey.OP_READ : 0)
+                                | (unregisteredConnection.wantsWrite() ? SelectionKey.OP_WRITE : 0);
                 try
                 {
                     unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
@@ -509,8 +509,8 @@ class SelectorThread extends Thread
         if(selectionTask != null)
         {
             final SelectionKey selectionKey = connection.getSocketChannel().keyFor(selectionTask.getSelector());
-            int expectedOps = (connection.canRead() ? SelectionKey.OP_READ : 0)
-                              | (connection.canWrite() ? SelectionKey.OP_WRITE : 0);
+            int expectedOps = (connection.wantsRead() ? SelectionKey.OP_READ : 0)
+                              | (connection.wantsWrite() ? SelectionKey.OP_WRITE : 0);
 
             try
             {

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Thu Nov 12 10:24:24 2015
@@ -25,6 +25,8 @@ import java.net.SocketAddress;
 import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -246,16 +248,23 @@ public class AMQPConnection_0_10 extends
     @Override
     public void setTransportBlockedForWriting(final boolean blocked)
     {
-        _transportBlockedForWriting = blocked;
-        _connection.transportStateChanged();
+        if(_transportBlockedForWriting != blocked)
+        {
+            _transportBlockedForWriting = blocked;
+            _connection.transportStateChanged();
+        }
     }
 
     @Override
-    public void processPending()
+    public Iterator<Runnable> processPendingIterator()
     {
         if (isIOThread())
         {
-            _connection.processPending();
+            return _connection.processPendingIterator();
+        }
+        else
+        {
+            return Collections.emptyIterator();
         }
     }
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Thu Nov 12 10:24:24 2015
@@ -32,8 +32,10 @@ import java.security.Principal;
 import java.security.PrivilegedAction;
 import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
@@ -578,27 +580,71 @@ public class ServerConnection extends Co
         _amqpConnection.notifyWork();
     }
 
-    public void processPending()
+    public Iterator<Runnable> processPendingIterator()
     {
-        List<? extends AMQSessionModel<?>> sessionsWithPending = new ArrayList<>(getSessionModels());
-        while(!sessionsWithPending.isEmpty())
+        return new ProcessPendingIterator();
+    }
+
+    private class ProcessPendingIterator implements Iterator<Runnable>
+    {
+        private final List<? extends AMQSessionModel<?>> _sessionsWithPending;
+        private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
+        private ProcessPendingIterator()
+        {
+            _sessionsWithPending = new ArrayList<>(getSessionModels());
+            _sessionIterator = _sessionsWithPending.iterator();
+        }
+
+        @Override
+        public boolean hasNext()
         {
-            final Iterator<? extends AMQSessionModel<?>> iter = sessionsWithPending.iterator();
-            AMQSessionModel<?> session;
-            while(iter.hasNext())
+            return !(_sessionsWithPending.isEmpty() && _asyncTaskList.isEmpty());
+        }
+
+        @Override
+        public Runnable next()
+        {
+            if(!_sessionsWithPending.isEmpty())
             {
-                session = iter.next();
-                if(!session.processPending())
+                if(!_sessionIterator.hasNext())
                 {
-                    iter.remove();
+                    _sessionIterator = _sessionsWithPending.iterator();
                 }
+                final AMQSessionModel<?> session = _sessionIterator.next();
+                return new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        if(!session.processPending())
+                        {
+                            _sessionIterator.remove();
+                        }
+                    }
+                };
+            }
+            else if(!_asyncTaskList.isEmpty())
+            {
+                final Action<? super ServerConnection> asyncAction = _asyncTaskList.poll();
+                return new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        asyncAction.performAction(ServerConnection.this);
+                    }
+                };
+            }
+            else
+            {
+                throw new NoSuchElementException();
             }
         }
 
-        while(_asyncTaskList.peek() != null)
+        @Override
+        public void remove()
         {
-            Action<? super ServerConnection> asyncAction = _asyncTaskList.poll();
-            asyncAction.performAction(this);
+            throw new UnsupportedOperationException();
         }
     }
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Thu Nov 12 10:24:24 2015
@@ -31,11 +31,13 @@ import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
@@ -216,10 +218,13 @@ public class AMQPConnection_0_8
     @Override
     public void setTransportBlockedForWriting(final boolean blocked)
     {
-        _transportBlockedForWriting = blocked;
-        for(AMQChannel channel : _channelMap.values())
+        if(_transportBlockedForWriting != blocked)
         {
-            channel.transportStateChanged();
+            _transportBlockedForWriting = blocked;
+            for (AMQChannel channel : _channelMap.values())
+            {
+                channel.transportStateChanged();
+            }
         }
     }
 
@@ -277,7 +282,8 @@ public class AMQPConnection_0_8
                     if (_virtualHost.getState() == State.ACTIVE)
                     {
                         throw new ServerScopedRuntimeException(e);
-                    } else
+                    }
+                    else
                     {
                         throw new ConnectionScopedRuntimeException(e);
                     }
@@ -1517,37 +1523,6 @@ public class AMQPConnection_0_8
     }
 
     @Override
-    public void processPending()
-    {
-        if (!isIOThread())
-        {
-            return;
-        }
-
-        List<? extends AMQSessionModel<?>> sessionsWithPending = new ArrayList<>(getSessionModels());
-        while(!sessionsWithPending.isEmpty())
-        {
-            final Iterator<? extends AMQSessionModel<?>> iter = sessionsWithPending.iterator();
-            AMQSessionModel<?> session;
-            while(iter.hasNext())
-            {
-                session = iter.next();
-                if(!session.processPending())
-                {
-                    iter.remove();
-                }
-            }
-        }
-
-        while(_asyncTaskList.peek() != null)
-        {
-            Action<? super AMQPConnection_0_8> asyncAction = _asyncTaskList.poll();
-            asyncAction.performAction(this);
-        }
-
-    }
-
-    @Override
     public boolean hasWork()
     {
         return _stateChanged.get();
@@ -1577,4 +1552,77 @@ public class AMQPConnection_0_8
     {
         _workListener.set(listener);
     }
+
+    @Override
+    public Iterator<Runnable> processPendingIterator()
+    {
+        if (!isIOThread())
+        {
+            return Collections.emptyIterator();
+        }
+        return new ProcessPendingIterator();
+    }
+
+    private class ProcessPendingIterator implements Iterator<Runnable>
+    {
+        private final List<? extends AMQSessionModel<?>> _sessionsWithPending;
+        private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
+        private ProcessPendingIterator()
+        {
+            _sessionsWithPending = new ArrayList<>(getSessionModels());
+            _sessionIterator = _sessionsWithPending.iterator();
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+            return !(_sessionsWithPending.isEmpty() && _asyncTaskList.isEmpty());
+        }
+
+        @Override
+        public Runnable next()
+        {
+            if(!_sessionsWithPending.isEmpty())
+            {
+                if(!_sessionIterator.hasNext())
+                {
+                    _sessionIterator = _sessionsWithPending.iterator();
+                }
+                final AMQSessionModel<?> session = _sessionIterator.next();
+                return new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        if(!session.processPending())
+                        {
+                            _sessionIterator.remove();
+                        }
+                    }
+                };
+            }
+            else if(!_asyncTaskList.isEmpty())
+            {
+                final Action<? super AMQPConnection_0_8> asyncAction = _asyncTaskList.poll();
+                return new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        asyncAction.performAction(AMQPConnection_0_8.this);
+                    }
+                };
+            }
+            else
+            {
+                throw new NoSuchElementException();
+            }
+        }
+
+        @Override
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Thu Nov 12 10:24:24 2015
@@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
 import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -545,7 +547,7 @@ public class AMQPConnection_1_0 extends
 
     public void close()
     {
-        getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis()+ CLOSE_RESPONSE_TIMEOUT,
+        getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_RESPONSE_TIMEOUT,
                                                                    getNetwork()));
 
     }
@@ -558,17 +560,24 @@ public class AMQPConnection_1_0 extends
     @Override
     public void setTransportBlockedForWriting(final boolean blocked)
     {
-        _transportBlockedForWriting = blocked;
-        _connection.transportStateChanged();
+        if(_transportBlockedForWriting != blocked)
+        {
+            _transportBlockedForWriting = blocked;
+            _connection.transportStateChanged();
+        }
 
     }
 
     @Override
-    public void processPending()
+    public Iterator<Runnable> processPendingIterator()
     {
         if (isIOThread())
         {
-            _connection.processPending();
+            return _connection.processPendingIterator();
+        }
+        else
+        {
+            return Collections.emptyIterator();
         }
     }
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Thu Nov 12 10:24:24 2015
@@ -32,6 +32,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -378,29 +379,72 @@ public class Connection_1_0 implements C
         _amqpConnection.notifyWork();
     }
 
-    public void processPending()
+    public Iterator<Runnable> processPendingIterator()
     {
-        List<? extends AMQSessionModel<?>> sessionsWithPending = new ArrayList<>(getSessionModels());
-        while(!sessionsWithPending.isEmpty())
+        return new ProcessPendingIterator();
+    }
+
+    private class ProcessPendingIterator implements Iterator<Runnable>
+    {
+        private final List<? extends AMQSessionModel<?>> _sessionsWithPending;
+        private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
+        private ProcessPendingIterator()
+        {
+            _sessionsWithPending = new ArrayList<>(getSessionModels());
+            _sessionIterator = _sessionsWithPending.iterator();
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+            return !(_sessionsWithPending.isEmpty() && _asyncTaskList.isEmpty());
+        }
+
+        @Override
+        public Runnable next()
         {
-            final Iterator<? extends AMQSessionModel<?>> iter = sessionsWithPending.iterator();
-            AMQSessionModel<?> session;
-            while(iter.hasNext())
+            if(!_sessionsWithPending.isEmpty())
             {
-                session = iter.next();
-                if(!session.processPending())
+                if(!_sessionIterator.hasNext())
                 {
-                    iter.remove();
+                    _sessionIterator = _sessionsWithPending.iterator();
                 }
+                final AMQSessionModel<?> session = _sessionIterator.next();
+                return new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        if(!session.processPending())
+                        {
+                            _sessionIterator.remove();
+                        }
+                    }
+                };
+            }
+            else if(!_asyncTaskList.isEmpty())
+            {
+                final Action<? super Connection_1_0> asyncAction = _asyncTaskList.poll();
+                return new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        asyncAction.performAction(Connection_1_0.this);
+                    }
+                };
+            }
+            else
+            {
+                throw new NoSuchElementException();
             }
         }
 
-        while(_asyncTaskList.peek() != null)
+        @Override
+        public void remove()
         {
-            Action<? super Connection_1_0> asyncAction = _asyncTaskList.poll();
-            asyncAction.performAction(this);
+            throw new UnsupportedOperationException();
         }
-
     }
 
     @Override

Modified: qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Thu Nov 12 10:24:24 2015
@@ -28,6 +28,7 @@ import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -236,8 +237,11 @@ class WebSocketProvider implements Accep
 
                 _protocolEngine.clearWork();
                 _protocolEngine.setMessageAssignmentSuspended(true);
-
-                _protocolEngine.processPending();
+                Iterator<Runnable> iter = _protocolEngine.processPendingIterator();
+                while(iter.hasNext())
+                {
+                    iter.next().run();
+                }
 
                 QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(length);
                 buffer.put(data,offset,length);
@@ -443,7 +447,11 @@ class WebSocketProvider implements Accep
             _protocolEngine.clearWork();
             _protocolEngine.setMessageAssignmentSuspended(true);
 
-            _protocolEngine.processPending();
+            Iterator<Runnable> iter = _protocolEngine.processPendingIterator();
+            while(iter.hasNext())
+            {
+                iter.next().run();
+            }
 
             doWrite();
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org