You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/03/03 15:15:31 UTC

svn commit: r1663708 - in /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java: broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-core/src/main/java/org/apache/qpid/server/transport/ broker-core/src/test/java/org/apache/qpid/server/consumer...

Author: kwall
Date: Tue Mar  3 14:15:30 2015
New Revision: 1663708

URL: http://svn.apache.org/r1663708
Log:
channel block/unblock now async, remove unnecessary selector bumps

Modified:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Tue Mar  3 14:15:30 2015
@@ -116,5 +116,5 @@ public interface AMQSessionModel<T exten
 
     void transportStateChanged();
 
-    void processPendingMessages();
+    void processPending();
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Tue Mar  3 14:15:30 2015
@@ -623,6 +623,8 @@ public class NonBlockingConnection imple
     @Override
     public void send(final ByteBuffer msg)
     {
+        assert Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX) : "Send called by unexpected thread " + Thread.currentThread().getName();
+
         if (_closed.get())
         {
             throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + " is already closed");
@@ -634,7 +636,5 @@ public class NonBlockingConnection imple
     @Override
     public void flush()
     {
-        getSelector().wakeup();
-
     }
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Tue Mar  3 14:15:30 2015
@@ -36,11 +36,9 @@ import java.util.concurrent.ScheduledThr
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-/**
-* Created by keith on 28/01/2015.
-*/
 public class SelectorThread extends Thread
 {
+    public static final String IO_THREAD_NAME_PREFIX  = "NCS-";
     private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>();
     private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
     private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
@@ -289,7 +287,8 @@ public class SelectorThread extends Thre
                                     String currentName = Thread.currentThread().getName();
                                     try
                                     {
-                                        Thread.currentThread().setName("NCS-"+connection.getRemoteAddress().toString());
+                                        Thread.currentThread().setName(
+                                                IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString());
                                         processConnection(connection);
                                     }
                                     finally

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Tue Mar  3 14:15:30 2015
@@ -488,7 +488,7 @@ public class MockConsumer implements Con
         }
 
         @Override
-        public void processPendingMessages()
+        public void processPending()
         {
 
         }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Tue Mar  3 14:15:30 2015
@@ -632,7 +632,6 @@ public class ConsumerTarget_0_10 extends
 
     public void flushBatched()
     {
-        _session.getConnection().flush();
     }
 
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Tue Mar  3 14:15:30 2015
@@ -70,7 +70,6 @@ public class ServerConnection extends Co
 {
 
     private final Broker<?> _broker;
-    private Runnable _onOpenTask;
     private AtomicBoolean _logClosed = new AtomicBoolean(false);
 
     private final Subject _authorizedSubject = new Subject();
@@ -79,10 +78,10 @@ public class ServerConnection extends Co
     private final long _connectionId;
     private final Object _reference = new Object();
     private VirtualHostImpl<?,?,?> _virtualHost;
-    private AmqpPort<?> _port;
-    private AtomicLong _lastIoTime = new AtomicLong();
+    private final AmqpPort<?> _port;
+    private final AtomicLong _lastIoTime = new AtomicLong();
     private boolean _blocking;
-    private Transport _transport;
+    private final Transport _transport;
 
     private final CopyOnWriteArrayList<Action<? super ServerConnection>> _connectionCloseTaskList =
             new CopyOnWriteArrayList<Action<? super ServerConnection>>();
@@ -95,7 +94,7 @@ public class ServerConnection extends Co
 
     private volatile boolean _stopped;
     private int _messageCompressionThreshold;
-    private int _maxMessageSize;
+    private final int _maxMessageSize;
 
     private ServerProtocolEngine _serverProtocolEngine;
 
@@ -149,10 +148,6 @@ public class ServerConnection extends Co
 
         if (state == State.OPEN)
         {
-            if (_onOpenTask != null)
-            {
-                _onOpenTask.run();
-            }
             getEventLogger().message(ConnectionMessages.OPEN(getClientId(),
                                                              "0-10",
                                                              getClientVersion(),
@@ -256,11 +251,6 @@ public class ServerConnection extends Co
         return _stopped;
     }
 
-    public void onOpen(final Runnable task)
-    {
-        _onOpenTask = task;
-    }
-
     public void closeSessionAsync(final ServerSession session, final AMQConstant cause, final String message)
     {
         addAsyncTask(new Action<ServerConnection>()
@@ -740,7 +730,7 @@ public class ServerConnection extends Co
 
         for (AMQSessionModel session : getSessionModels())
         {
-            session.processPendingMessages();
+            session.processPending();
         }
 
     }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Tue Mar  3 14:15:30 2015
@@ -137,6 +137,7 @@ public class ServerSession extends Sessi
     private org.apache.qpid.server.model.Session<?> _modelObject;
     private long _blockTime;
     private long _blockingTimeout;
+    private boolean _wireBlockingState;
 
     public static interface MessageDispositionChangeListener
     {
@@ -208,10 +209,6 @@ public class ServerSession extends Sessi
             if (state == State.OPEN)
             {
                 getVirtualHost().getEventLogger().message(ChannelMessages.CREATE());
-                if(_blocking.get())
-                {
-                    invokeBlock();
-                }
             }
         }
         else
@@ -245,6 +242,17 @@ public class ServerSession extends Sessi
         invoke(new MessageStop(""));
     }
 
+    private void invokeUnblock()
+    {
+        MessageFlow mf = new MessageFlow();
+        mf.setUnit(MessageCreditUnit.MESSAGE);
+        mf.setDestination("");
+        _outstandingCredit.set(Integer.MAX_VALUE);
+        mf.setValue(Integer.MAX_VALUE);
+        invoke(mf);
+    }
+
+
     @Override
     protected boolean isFull(int id)
     {
@@ -824,12 +832,11 @@ public class ServerSession extends Sessi
 
                 if(_blocking.compareAndSet(false,true))
                 {
+                    getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
                     if(getState() == State.OPEN)
                     {
-                        invokeBlock();
+                        getConnection().notifyWork();
                     }
-                    _blockTime = System.currentTimeMillis();
-                    getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
                 }
 
 
@@ -853,24 +860,17 @@ public class ServerSession extends Sessi
         {
             if(_blocking.compareAndSet(true,false) && !isClosing())
             {
-                _blockTime = 0l;
                 getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
-                MessageFlow mf = new MessageFlow();
-                mf.setUnit(MessageCreditUnit.MESSAGE);
-                mf.setDestination("");
-                _outstandingCredit.set(Integer.MAX_VALUE);
-                mf.setValue(Integer.MAX_VALUE);
-                invoke(mf);
-
-
+                getConnection().notifyWork();
             }
         }
     }
 
+
     boolean blockingTimeoutExceeded()
     {
         long blockTime = _blockTime;
-        boolean b = _blocking.get() && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout;
+        boolean b = _wireBlockingState && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout;
         return b;
     }
 
@@ -1136,8 +1136,25 @@ public class ServerSession extends Sessi
     }
 
     @Override
-    public void processPendingMessages()
+    public void processPending()
     {
+        boolean desiredBlockingState = _blocking.get();
+        if (desiredBlockingState != _wireBlockingState)
+        {
+            _wireBlockingState = desiredBlockingState;
+
+            if (desiredBlockingState)
+            {
+                invokeBlock();
+            }
+            else
+            {
+                invokeUnblock();
+            }
+            _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
+        }
+
+
         for(ConsumerTarget target : getSubscriptions())
         {
             target.processPending();

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Mar  3 14:15:30 2015
@@ -209,6 +209,8 @@ public class AMQChannel
     private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>();
     private long _maxUncommittedInMemorySize;
 
+    private boolean _wireBlockingState;
+
     public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
     {
         _creditManager = new Pre0_10CreditManager(0l,0l, connection);
@@ -1611,12 +1613,14 @@ public class AMQChannel
     {
         if(_blockingEntities.add(this))
         {
+
             if(_blocking.compareAndSet(false,true))
             {
                 getVirtualHost().getEventLogger().message(_logSubject,
                                                           ChannelMessages.FLOW_ENFORCED("** All Queues **"));
-                flow(false);
-                _blockTime = System.currentTimeMillis();
+
+
+                getConnection().notifyWork();
             }
         }
     }
@@ -1628,8 +1632,7 @@ public class AMQChannel
             if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false))
             {
                 getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
-
-                flow(true);
+                getConnection().notifyWork();
             }
         }
     }
@@ -1643,8 +1646,7 @@ public class AMQChannel
             if(_blocking.compareAndSet(false,true))
             {
                 getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName()));
-                flow(false);
-                _blockTime = System.currentTimeMillis();
+                getConnection().notifyWork();
 
             }
         }
@@ -1657,7 +1659,7 @@ public class AMQChannel
             if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing())
             {
                 getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
-                flow(true);
+                getConnection().notifyWork();
             }
         }
     }
@@ -2262,7 +2264,7 @@ public class AMQChannel
     private boolean blockingTimeoutExceeded()
     {
 
-        return _blocking.get() && (System.currentTimeMillis() - _blockTime) > _blockingTimeout;
+        return _wireBlockingState && (System.currentTimeMillis() - _blockTime) > _blockingTimeout;
     }
 
     @Override
@@ -3598,9 +3600,17 @@ public class AMQChannel
     }
 
     @Override
-    public void processPendingMessages()
+    public void processPending()
     {
 
+        boolean desiredBlockingState = _blocking.get();
+        if (desiredBlockingState != _wireBlockingState)
+        {
+            _wireBlockingState = desiredBlockingState;
+            flow(!desiredBlockingState);
+            _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
+        }
+
         for(ConsumerTarget target : _tag2SubscriptionTargetMap.values())
         {
             target.processPending();

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Tue Mar  3 14:15:30 2015
@@ -44,8 +44,6 @@ import java.util.concurrent.CopyOnWriteA
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import javax.security.auth.Subject;
 import javax.security.sasl.SaslException;
@@ -148,11 +146,8 @@ public class AMQProtocolEngine implement
      * The channels that the latest call to {@link #received(ByteBuffer)} applied to.
      * Used so we know which channels we need to call {@link AMQChannel#receivedComplete()}
      * on after handling the frames.
-     *
-     * Thread-safety: guarded by {@link #_receivedLock}.
      */
-    private final Set<AMQChannel> _channelsForCurrentMessage =
-            new HashSet<>();
+    private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<>();
 
     private AMQDecoder _decoder;
 
@@ -197,7 +192,6 @@ public class AMQProtocolEngine implement
     private long _lastReceivedTime = System.currentTimeMillis();  // TODO consider if this is what we want?
     private boolean _blocking;
 
-    private final ReentrantLock _receivedLock;
     private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis());
     private final Broker<?> _broker;
     private final Transport _transport;
@@ -251,7 +245,6 @@ public class AMQProtocolEngine implement
         _port = port;
         _transport = transport;
         _maxNoOfChannels = broker.getConnection_sessionCountLimit();
-        _receivedLock = new ReentrantLock();
         _decoder = new BrokerDecoder(this);
         _connectionID = connectionId;
         _logSubject = new ConnectionLogSubject(this);
@@ -545,43 +538,8 @@ public class AMQProtocolEngine implement
 
 
     private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
-    private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
     private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
 
-    private ByteBuffer asByteBuffer(AMQDataBlock block)
-    {
-        final int size = (int) block.getSize();
-
-        final byte[] data;
-
-
-        if(size > REUSABLE_BYTE_BUFFER_CAPACITY)
-        {
-            data= new byte[size];
-        }
-        else
-        {
-
-            data = _reusableBytes;
-        }
-        _reusableDataOutput.setBuffer(data);
-
-        try
-        {
-            block.writePayload(_reusableDataOutput);
-        }
-        catch (IOException e)
-        {
-            throw new ServerScopedRuntimeException(e);
-        }
-
-        final ByteBuffer copy = ByteBuffer.allocate(_reusableDataOutput.length());
-        copy.put(data, 0, _reusableDataOutput.length());
-        copy.flip();
-        return copy;
-    }
-
-
     /**
      * Convenience method that writes a frame to the protocol session. Equivalent to calling
      * getProtocolSession().write().
@@ -1969,11 +1927,6 @@ public class AMQProtocolEngine implement
         return _reference;
     }
 
-    public Lock getReceivedLock()
-    {
-        return _receivedLock;
-    }
-
     @Override
     public long getLastReadTime()
     {
@@ -2095,6 +2048,8 @@ public class AMQProtocolEngine implement
     @Override
     public void processPending()
     {
+
+
         while(_asyncTaskList.peek() != null)
         {
             Action<? super AMQProtocolEngine> asyncAction = _asyncTaskList.poll();
@@ -2103,7 +2058,7 @@ public class AMQProtocolEngine implement
 
         for (AMQSessionModel session : getSessionModels())
         {
-            session.processPendingMessages();
+            session.processPending();
         }
     }
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Tue Mar  3 14:15:30 2015
@@ -515,6 +515,7 @@ public abstract class ConsumerTarget_0_8
         if (isAutoClose())
         {
             _needToClose.set(true);
+            getChannel().getConnection().notifyWork();
         }
     }
 
@@ -531,8 +532,6 @@ public abstract class ConsumerTarget_0_8
     public void flushBatched()
     {
         _channel.getConnection().setDeferFlush(false);
-
-        _channel.getConnection().notifyWork();
     }
 
     protected void addUnacknowledgedMessage(MessageInstance entry)

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Tue Mar  3 14:15:30 2015
@@ -552,7 +552,7 @@ public class Connection_1_0 implements C
 
         for (AMQSessionModel session : getSessionModels())
         {
-            session.processPendingMessages();
+            session.processPending();
         }
 
     }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Tue Mar  3 14:15:30 2015
@@ -901,7 +901,7 @@ public class Session_1_0 implements Sess
     }
 
     @Override
-    public void processPendingMessages()
+    public void processPending()
     {
         for(Consumer<?> consumer : getConsumers())
         {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org