You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/07 18:20:01 UTC

svn commit: r1768568 - in /qpid/java/branches/remove-queue-runner: broker-core/src/main/java/org/apache/qpid/server/virtualhost/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ broker-plugins/amqp-0-8-protocol/src...

Author: rgodfrey
Date: Mon Nov  7 18:20:01 2016
New Revision: 1768568

URL: http://svn.apache.org/viewvc?rev=1768568&view=rev
Log:
Limit process pending iteration to sessions which have been notified that they have work

Modified:
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1768568&r1=1768567&r2=1768568&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Mon Nov  7 18:20:01 2016
@@ -122,12 +122,31 @@ public abstract class AbstractSystemMess
         {
             _name = consumerName;
             _target = target;
+            _target.addStateListener(new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
+            {
+                @Override
+                public void stateChanged(final ConsumerTarget object,
+                                         final ConsumerTarget.State oldState,
+                                         final ConsumerTarget.State newState)
+                {
+                    if(newState == ConsumerTarget.State.ACTIVE)
+                    {
+                        if(!_queue.isEmpty())
+                        {
+                            _target.notifyWork();
+                        }
+                    }
+                }
+            });
         }
 
         @Override
         public void externalStateChange()
         {
-
+            if(!_queue.isEmpty())
+            {
+                _target.notifyWork();
+            }
         }
 
         @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java?rev=1768568&r1=1768567&r2=1768568&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java Mon Nov  7 18:20:01 2016
@@ -66,10 +66,10 @@ public class VirtualHostPropertiesNode e
 
         Map<String, Object> headers = new HashMap<>();
 
-        final List<String> globalAddresseDomains = _addressSpace.getGlobalAddressDomains();
-        if (globalAddresseDomains != null && !globalAddresseDomains.isEmpty())
+        final List<String> globalAddressDomains = _addressSpace.getGlobalAddressDomains();
+        if (globalAddressDomains != null && !globalAddressDomains.isEmpty())
         {
-            String primaryDomain = globalAddresseDomains.get(0);
+            String primaryDomain = globalAddressDomains.get(0);
             if(primaryDomain != null)
             {
                 primaryDomain = primaryDomain.trim();

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1768568&r1=1768567&r2=1768568&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Mon Nov  7 18:20:01 2016
@@ -27,6 +27,8 @@ import java.security.PrivilegedAction;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -68,6 +70,9 @@ public class AMQPConnection_0_10 extends
     private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
     private ServerDisassembler _disassembler;
 
+    private final Set<AMQSessionModel<?>> _sessionsWithWork =
+            Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?>, Boolean>());
+
 
     public AMQPConnection_0_10(final Broker<?> broker,
                                ServerNetworkConnection network,
@@ -251,7 +256,7 @@ public class AMQPConnection_0_10 extends
     {
         if (isIOThread())
         {
-            return _connection.processPendingIterator();
+            return _connection.processPendingIterator(_sessionsWithWork);
         }
         else
         {
@@ -280,6 +285,7 @@ public class AMQPConnection_0_10 extends
     @Override
     public void notifyWork(final AMQSessionModel<?> sessionModel)
     {
+        _sessionsWithWork.add(sessionModel);
         notifyWork();
     }
 

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1768568&r1=1768567&r2=1768568&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Mon Nov  7 18:20:01 2016
@@ -27,12 +27,12 @@ import java.security.AccessControlContex
 import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -42,13 +42,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.protocol.ConnectionClosingTicker;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -491,18 +491,18 @@ public class ServerConnection extends Co
         }
     }
 
-    public Iterator<Runnable> processPendingIterator()
+    public Iterator<Runnable> processPendingIterator(final Set<AMQSessionModel<?>> sessionsWithWork)
     {
-        return new ProcessPendingIterator();
+        return new ProcessPendingIterator(sessionsWithWork);
     }
 
     private class ProcessPendingIterator implements Iterator<Runnable>
     {
-        private final Collection<? extends ServerSession> _sessionsWithPending;
+        private final Collection<AMQSessionModel<?>> _sessionsWithPending;
         private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
-        private ProcessPendingIterator()
+        private ProcessPendingIterator(final Set<AMQSessionModel<?>> sessionsWithWork)
         {
-            _sessionsWithPending = new ArrayList<>(getSessionModels());
+            _sessionsWithPending = sessionsWithWork;
             _sessionIterator = _sessionsWithPending.iterator();
         }
 
@@ -527,9 +527,10 @@ public class ServerConnection extends Co
                     @Override
                     public void run()
                     {
-                        if(!session.processPending())
+                        _sessionIterator.remove();
+                        if(session.processPending())
                         {
-                            _sessionIterator.remove();
+                            _sessionsWithPending.add(session);
                         }
                     }
                 };

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java?rev=1768568&r1=1768567&r2=1768568&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java Mon Nov  7 18:20:01 2016
@@ -30,7 +30,6 @@ import java.security.AccessControlExcept
 import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -165,6 +164,10 @@ public class AMQPConnection_0_8Impl
     private volatile boolean _transportBlockedForWriting;
     private volatile SubjectAuthenticationResult _successfulAuthenticationResult;
 
+    private final Set<AMQSessionModel<?>> _sessionsWithWork =
+            Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?>, Boolean>());
+
+
     public AMQPConnection_0_8Impl(Broker<?> broker,
                                   ServerNetworkConnection network,
                                   AmqpPort<?> port,
@@ -1395,6 +1398,7 @@ public class AMQPConnection_0_8Impl
     @Override
     public void notifyWork(final AMQSessionModel<?> sessionModel)
     {
+        _sessionsWithWork.add(sessionModel);
         notifyWork();
     }
 
@@ -1422,28 +1426,27 @@ public class AMQPConnection_0_8Impl
 
     private class ProcessPendingIterator implements Iterator<Runnable>
     {
-        private final Collection<? extends AMQChannel> _sessionsWithPending;
         private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
+
         private ProcessPendingIterator()
         {
-            _sessionsWithPending = new ArrayList<>(getSessionModels());
-            _sessionIterator = _sessionsWithPending.iterator();
+            _sessionIterator = _sessionsWithWork.iterator();
         }
 
         @Override
         public boolean hasNext()
         {
-            return !(_sessionsWithPending.isEmpty() && _asyncTaskList.isEmpty());
+            return !(_sessionsWithWork.isEmpty() && _asyncTaskList.isEmpty());
         }
 
         @Override
         public Runnable next()
         {
-            if(!_sessionsWithPending.isEmpty())
+            if(!_sessionsWithWork.isEmpty())
             {
                 if(!_sessionIterator.hasNext())
                 {
-                    _sessionIterator = _sessionsWithPending.iterator();
+                    _sessionIterator = _sessionsWithWork.iterator();
                 }
                 final AMQSessionModel<?> session = _sessionIterator.next();
                 return new Runnable()
@@ -1451,9 +1454,11 @@ public class AMQPConnection_0_8Impl
                     @Override
                     public void run()
                     {
-                        if(!session.processPending())
+                        _sessionIterator.remove();
+
+                        if(session.processPending())
                         {
-                            _sessionIterator.remove();
+                            _sessionsWithWork.add(session);
                         }
                     }
                 };

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1768568&r1=1768567&r2=1768568&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Mon Nov  7 18:20:01 2016
@@ -36,6 +36,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -233,7 +235,8 @@ public class AMQPConnection_1_0 extends
 
     private boolean _closedOnOpen;
 
-
+    private final Set<AMQSessionModel<?>> _sessionsWithWork =
+            Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?>, Boolean>());
 
     AMQPConnection_1_0(final Broker<?> broker,
                        final ServerNetworkConnection network,
@@ -1379,6 +1382,7 @@ public class AMQPConnection_1_0 extends
     @Override
     public void notifyWork(final AMQSessionModel<?> sessionModel)
     {
+        _sessionsWithWork.add(sessionModel);
         notifyWork();
     }
 
@@ -1539,28 +1543,26 @@ public class AMQPConnection_1_0 extends
 
     private class ProcessPendingIterator implements Iterator<Runnable>
     {
-        private final Collection<? extends AMQSessionModel<?>> _sessionsWithPending;
         private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
         private ProcessPendingIterator()
         {
-            _sessionsWithPending = new ArrayList<>(getSessionModels());
-            _sessionIterator = _sessionsWithPending.iterator();
+            _sessionIterator = _sessionsWithWork.iterator();
         }
 
         @Override
         public boolean hasNext()
         {
-            return !(_sessionsWithPending.isEmpty() && _asyncTaskList.isEmpty());
+            return !(_sessionsWithWork.isEmpty() && _asyncTaskList.isEmpty());
         }
 
         @Override
         public Runnable next()
         {
-            if(!_sessionsWithPending.isEmpty())
+            if(!_sessionsWithWork.isEmpty())
             {
                 if(!_sessionIterator.hasNext())
                 {
-                    _sessionIterator = _sessionsWithPending.iterator();
+                    _sessionIterator = _sessionsWithWork.iterator();
                 }
                 final AMQSessionModel<?> session = _sessionIterator.next();
                 return new Runnable()
@@ -1568,9 +1570,10 @@ public class AMQPConnection_1_0 extends
                     @Override
                     public void run()
                     {
-                        if(!session.processPending())
+                        _sessionIterator.remove();
+                        if(session.processPending())
                         {
-                            _sessionIterator.remove();
+                            _sessionsWithWork.add(session);
                         }
                     }
                 };

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1768568&r1=1768567&r2=1768568&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Mon Nov  7 18:20:01 2016
@@ -61,7 +61,10 @@ class ManagementNodeConsumer implements
     @Override
     public void externalStateChange()
     {
-
+        if(!_queue.isEmpty())
+        {
+            _target.notifyWork();
+        }
     }
 
     @Override



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org