You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/11/12 11:24:24 UTC
svn commit: r1714003 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/transport/
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpi...
Author: rgodfrey
Date: Thu Nov 12 10:24:24 2015
New Revision: 1714003
URL: http://svn.apache.org/viewvc?rev=1714003&view=rev
Log:
QPID-6840 : interleave calls to process pending work with network writes
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java Thu Nov 12 10:24:24 2015
@@ -24,6 +24,8 @@ package org.apache.qpid.server.transport
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.cert.Certificate;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -197,9 +199,9 @@ public class MultiVersionProtocolEngine
}
@Override
- public void processPending()
+ public Iterator<Runnable> processPendingIterator()
{
- _delegate.processPending();
+ return _delegate.processPendingIterator();
}
@Override
@@ -249,9 +251,9 @@ public class MultiVersionProtocolEngine
}
@Override
- public void processPending()
+ public Iterator<Runnable> processPendingIterator()
{
-
+ return Collections.emptyIterator();
}
@Override
@@ -370,9 +372,9 @@ public class MultiVersionProtocolEngine
}
@Override
- public void processPending()
+ public Iterator<Runnable> processPendingIterator()
{
-
+ return Collections.emptyIterator();
}
@Override
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Thu Nov 12 10:24:24 2015
@@ -26,6 +26,7 @@ import java.nio.channels.SocketChannel;
import java.security.Principal;
import java.security.cert.Certificate;
import java.util.Collection;
+import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -71,6 +72,7 @@ public class NonBlockingConnection imple
private volatile boolean _unexpectedByteBufferSizeReported;
private final String _threadName;
private volatile SelectorThread.SelectionTask _selectionTask;
+ private Iterator<Runnable> _pendingItertor;
public NonBlockingConnection(SocketChannel socketChannel,
ProtocolEngine protocolEngine,
@@ -211,12 +213,12 @@ public class NonBlockingConnection imple
}
}
- public boolean canRead()
+ public boolean wantsRead()
{
return _fullyWritten;
}
- public boolean canWrite()
+ public boolean wantsWrite()
{
return !_fullyWritten;
}
@@ -243,16 +245,33 @@ public class NonBlockingConnection imple
_protocolEngine.setIOThread(Thread.currentThread());
_protocolEngine.setMessageAssignmentSuspended(true);
- if (!_fullyWritten)
+ if(_pendingItertor == null)
{
- doWrite();
+ _pendingItertor = _protocolEngine.processPendingIterator();
}
- if (_fullyWritten)
+ while(_pendingItertor.hasNext())
{
- _protocolEngine.processPending();
+ long size = getBufferedSize();
+ if(size >= _port.getNetworkBufferSize())
+ {
+ doWrite();
+ if((size - getBufferedSize()) < (_port.getNetworkBufferSize()/2))
+ {
+ break;
+ }
+ }
+ else
+ {
+ final Runnable task = _pendingItertor.next();
+ task.run();
+ }
+ }
- _protocolEngine.setTransportBlockedForWriting(!doWrite());
+ if (!_pendingItertor.hasNext())
+ {
+ _pendingItertor = null;
+ _protocolEngine.setTransportBlockedForWriting(false);
boolean dataRead = doRead();
_protocolEngine.setTransportBlockedForWriting(!doWrite());
@@ -306,6 +325,16 @@ public class NonBlockingConnection imple
}
+ private long getBufferedSize()
+ {
+ long totalSize = 0l;
+ for(QpidByteBuffer buf : _buffers)
+ {
+ totalSize += buf.remaining();
+ }
+ return totalSize;
+ }
+
private void shutdown()
{
try
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java Thu Nov 12 10:24:24 2015
@@ -20,13 +20,12 @@
*/
package org.apache.qpid.server.transport;
-import java.nio.ByteBuffer;
+import java.util.Iterator;
import javax.security.auth.Subject;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.TransportActivity;
@@ -37,43 +36,43 @@ import org.apache.qpid.transport.network
public interface ProtocolEngine extends TransportActivity
{
- // Called by the NetworkDriver when the socket has been closed for reading
- void closed();
+ // Called by the NetworkDriver when the socket has been closed for reading
+ void closed();
- // Called when the NetworkEngine has not written data for the specified period of time (will trigger a
- // heartbeat)
- @Override
- void writerIdle();
+ // Called when the NetworkEngine has not written data for the specified period of time (will trigger a
+ // heartbeat)
+ @Override
+ void writerIdle();
- // Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
- @Override
- void readerIdle();
+ // Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
+ @Override
+ void readerIdle();
- Subject getSubject();
+ Subject getSubject();
- boolean isTransportBlockedForWriting();
+ boolean isTransportBlockedForWriting();
- void setTransportBlockedForWriting(boolean blocked);
+ void setTransportBlockedForWriting(boolean blocked);
- void setMessageAssignmentSuspended(boolean value);
+ void setMessageAssignmentSuspended(boolean value);
- boolean isMessageAssignmentSuspended();
+ boolean isMessageAssignmentSuspended();
- void processPending();
+ Iterator<Runnable> processPendingIterator();
- boolean hasWork();
+ boolean hasWork();
- void clearWork();
+ void clearWork();
- void notifyWork();
+ void notifyWork();
- void setWorkListener(Action<ProtocolEngine> listener);
+ void setWorkListener(Action<ProtocolEngine> listener);
- AggregateTicker getAggregateTicker();
+ AggregateTicker getAggregateTicker();
- void encryptedTransport();
+ void encryptedTransport();
- void received(QpidByteBuffer msg);
+ void received(QpidByteBuffer msg);
- void setIOThread(Thread ioThread);
+ void setIOThread(Thread ioThread);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Thu Nov 12 10:24:24 2015
@@ -248,8 +248,8 @@ class SelectorThread extends Thread
getUnscheduledConnections().add(unregisteredConnection);
- final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
- | (unregisteredConnection.canWrite() ? SelectionKey.OP_WRITE : 0);
+ final int ops = (unregisteredConnection.wantsRead() ? SelectionKey.OP_READ : 0)
+ | (unregisteredConnection.wantsWrite() ? SelectionKey.OP_WRITE : 0);
try
{
unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
@@ -509,8 +509,8 @@ class SelectorThread extends Thread
if(selectionTask != null)
{
final SelectionKey selectionKey = connection.getSocketChannel().keyFor(selectionTask.getSelector());
- int expectedOps = (connection.canRead() ? SelectionKey.OP_READ : 0)
- | (connection.canWrite() ? SelectionKey.OP_WRITE : 0);
+ int expectedOps = (connection.wantsRead() ? SelectionKey.OP_READ : 0)
+ | (connection.wantsWrite() ? SelectionKey.OP_WRITE : 0);
try
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Thu Nov 12 10:24:24 2015
@@ -25,6 +25,8 @@ import java.net.SocketAddress;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -246,16 +248,23 @@ public class AMQPConnection_0_10 extends
@Override
public void setTransportBlockedForWriting(final boolean blocked)
{
- _transportBlockedForWriting = blocked;
- _connection.transportStateChanged();
+ if(_transportBlockedForWriting != blocked)
+ {
+ _transportBlockedForWriting = blocked;
+ _connection.transportStateChanged();
+ }
}
@Override
- public void processPending()
+ public Iterator<Runnable> processPendingIterator()
{
if (isIOThread())
{
- _connection.processPending();
+ return _connection.processPendingIterator();
+ }
+ else
+ {
+ return Collections.emptyIterator();
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Thu Nov 12 10:24:24 2015
@@ -32,8 +32,10 @@ import java.security.Principal;
import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
@@ -578,27 +580,71 @@ public class ServerConnection extends Co
_amqpConnection.notifyWork();
}
- public void processPending()
+ public Iterator<Runnable> processPendingIterator()
{
- List<? extends AMQSessionModel<?>> sessionsWithPending = new ArrayList<>(getSessionModels());
- while(!sessionsWithPending.isEmpty())
+ return new ProcessPendingIterator();
+ }
+
+ private class ProcessPendingIterator implements Iterator<Runnable>
+ {
+ private final List<? extends AMQSessionModel<?>> _sessionsWithPending;
+ private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
+ private ProcessPendingIterator()
+ {
+ _sessionsWithPending = new ArrayList<>(getSessionModels());
+ _sessionIterator = _sessionsWithPending.iterator();
+ }
+
+ @Override
+ public boolean hasNext()
{
- final Iterator<? extends AMQSessionModel<?>> iter = sessionsWithPending.iterator();
- AMQSessionModel<?> session;
- while(iter.hasNext())
+ return !(_sessionsWithPending.isEmpty() && _asyncTaskList.isEmpty());
+ }
+
+ @Override
+ public Runnable next()
+ {
+ if(!_sessionsWithPending.isEmpty())
{
- session = iter.next();
- if(!session.processPending())
+ if(!_sessionIterator.hasNext())
{
- iter.remove();
+ _sessionIterator = _sessionsWithPending.iterator();
}
+ final AMQSessionModel<?> session = _sessionIterator.next();
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ if(!session.processPending())
+ {
+ _sessionIterator.remove();
+ }
+ }
+ };
+ }
+ else if(!_asyncTaskList.isEmpty())
+ {
+ final Action<? super ServerConnection> asyncAction = _asyncTaskList.poll();
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ asyncAction.performAction(ServerConnection.this);
+ }
+ };
+ }
+ else
+ {
+ throw new NoSuchElementException();
}
}
- while(_asyncTaskList.peek() != null)
+ @Override
+ public void remove()
{
- Action<? super ServerConnection> asyncAction = _asyncTaskList.poll();
- asyncAction.performAction(this);
+ throw new UnsupportedOperationException();
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Thu Nov 12 10:24:24 2015
@@ -31,11 +31,13 @@ import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
@@ -216,10 +218,13 @@ public class AMQPConnection_0_8
@Override
public void setTransportBlockedForWriting(final boolean blocked)
{
- _transportBlockedForWriting = blocked;
- for(AMQChannel channel : _channelMap.values())
+ if(_transportBlockedForWriting != blocked)
{
- channel.transportStateChanged();
+ _transportBlockedForWriting = blocked;
+ for (AMQChannel channel : _channelMap.values())
+ {
+ channel.transportStateChanged();
+ }
}
}
@@ -277,7 +282,8 @@ public class AMQPConnection_0_8
if (_virtualHost.getState() == State.ACTIVE)
{
throw new ServerScopedRuntimeException(e);
- } else
+ }
+ else
{
throw new ConnectionScopedRuntimeException(e);
}
@@ -1517,37 +1523,6 @@ public class AMQPConnection_0_8
}
@Override
- public void processPending()
- {
- if (!isIOThread())
- {
- return;
- }
-
- List<? extends AMQSessionModel<?>> sessionsWithPending = new ArrayList<>(getSessionModels());
- while(!sessionsWithPending.isEmpty())
- {
- final Iterator<? extends AMQSessionModel<?>> iter = sessionsWithPending.iterator();
- AMQSessionModel<?> session;
- while(iter.hasNext())
- {
- session = iter.next();
- if(!session.processPending())
- {
- iter.remove();
- }
- }
- }
-
- while(_asyncTaskList.peek() != null)
- {
- Action<? super AMQPConnection_0_8> asyncAction = _asyncTaskList.poll();
- asyncAction.performAction(this);
- }
-
- }
-
- @Override
public boolean hasWork()
{
return _stateChanged.get();
@@ -1577,4 +1552,77 @@ public class AMQPConnection_0_8
{
_workListener.set(listener);
}
+
+ @Override
+ public Iterator<Runnable> processPendingIterator()
+ {
+ if (!isIOThread())
+ {
+ return Collections.emptyIterator();
+ }
+ return new ProcessPendingIterator();
+ }
+
+ private class ProcessPendingIterator implements Iterator<Runnable>
+ {
+ private final List<? extends AMQSessionModel<?>> _sessionsWithPending;
+ private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
+ private ProcessPendingIterator()
+ {
+ _sessionsWithPending = new ArrayList<>(getSessionModels());
+ _sessionIterator = _sessionsWithPending.iterator();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return !(_sessionsWithPending.isEmpty() && _asyncTaskList.isEmpty());
+ }
+
+ @Override
+ public Runnable next()
+ {
+ if(!_sessionsWithPending.isEmpty())
+ {
+ if(!_sessionIterator.hasNext())
+ {
+ _sessionIterator = _sessionsWithPending.iterator();
+ }
+ final AMQSessionModel<?> session = _sessionIterator.next();
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ if(!session.processPending())
+ {
+ _sessionIterator.remove();
+ }
+ }
+ };
+ }
+ else if(!_asyncTaskList.isEmpty())
+ {
+ final Action<? super AMQPConnection_0_8> asyncAction = _asyncTaskList.poll();
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ asyncAction.performAction(AMQPConnection_0_8.this);
+ }
+ };
+ }
+ else
+ {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Thu Nov 12 10:24:24 2015
@@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -545,7 +547,7 @@ public class AMQPConnection_1_0 extends
public void close()
{
- getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis()+ CLOSE_RESPONSE_TIMEOUT,
+ getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_RESPONSE_TIMEOUT,
getNetwork()));
}
@@ -558,17 +560,24 @@ public class AMQPConnection_1_0 extends
@Override
public void setTransportBlockedForWriting(final boolean blocked)
{
- _transportBlockedForWriting = blocked;
- _connection.transportStateChanged();
+ if(_transportBlockedForWriting != blocked)
+ {
+ _transportBlockedForWriting = blocked;
+ _connection.transportStateChanged();
+ }
}
@Override
- public void processPending()
+ public Iterator<Runnable> processPendingIterator()
{
if (isIOThread())
{
- _connection.processPending();
+ return _connection.processPendingIterator();
+ }
+ else
+ {
+ return Collections.emptyIterator();
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Thu Nov 12 10:24:24 2015
@@ -32,6 +32,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -378,29 +379,72 @@ public class Connection_1_0 implements C
_amqpConnection.notifyWork();
}
- public void processPending()
+ public Iterator<Runnable> processPendingIterator()
{
- List<? extends AMQSessionModel<?>> sessionsWithPending = new ArrayList<>(getSessionModels());
- while(!sessionsWithPending.isEmpty())
+ return new ProcessPendingIterator();
+ }
+
+ private class ProcessPendingIterator implements Iterator<Runnable>
+ {
+ private final List<? extends AMQSessionModel<?>> _sessionsWithPending;
+ private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
+ private ProcessPendingIterator()
+ {
+ _sessionsWithPending = new ArrayList<>(getSessionModels());
+ _sessionIterator = _sessionsWithPending.iterator();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return !(_sessionsWithPending.isEmpty() && _asyncTaskList.isEmpty());
+ }
+
+ @Override
+ public Runnable next()
{
- final Iterator<? extends AMQSessionModel<?>> iter = sessionsWithPending.iterator();
- AMQSessionModel<?> session;
- while(iter.hasNext())
+ if(!_sessionsWithPending.isEmpty())
{
- session = iter.next();
- if(!session.processPending())
+ if(!_sessionIterator.hasNext())
{
- iter.remove();
+ _sessionIterator = _sessionsWithPending.iterator();
}
+ final AMQSessionModel<?> session = _sessionIterator.next();
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ if(!session.processPending())
+ {
+ _sessionIterator.remove();
+ }
+ }
+ };
+ }
+ else if(!_asyncTaskList.isEmpty())
+ {
+ final Action<? super Connection_1_0> asyncAction = _asyncTaskList.poll();
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ asyncAction.performAction(Connection_1_0.this);
+ }
+ };
+ }
+ else
+ {
+ throw new NoSuchElementException();
}
}
- while(_asyncTaskList.peek() != null)
+ @Override
+ public void remove()
{
- Action<? super Connection_1_0> asyncAction = _asyncTaskList.poll();
- asyncAction.performAction(this);
+ throw new UnsupportedOperationException();
}
-
}
@Override
Modified: qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1714003&r1=1714002&r2=1714003&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Thu Nov 12 10:24:24 2015
@@ -28,6 +28,7 @@ import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -236,8 +237,11 @@ class WebSocketProvider implements Accep
_protocolEngine.clearWork();
_protocolEngine.setMessageAssignmentSuspended(true);
-
- _protocolEngine.processPending();
+ Iterator<Runnable> iter = _protocolEngine.processPendingIterator();
+ while(iter.hasNext())
+ {
+ iter.next().run();
+ }
QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(length);
buffer.put(data,offset,length);
@@ -443,7 +447,11 @@ class WebSocketProvider implements Accep
_protocolEngine.clearWork();
_protocolEngine.setMessageAssignmentSuspended(true);
- _protocolEngine.processPending();
+ Iterator<Runnable> iter = _protocolEngine.processPendingIterator();
+ while(iter.hasNext())
+ {
+ iter.next().run();
+ }
doWrite();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org