You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/07 18:20:01 UTC
svn commit: r1768568 - in /qpid/java/branches/remove-queue-runner:
broker-core/src/main/java/org/apache/qpid/server/virtualhost/
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
broker-plugins/amqp-0-8-protocol/src...
Author: rgodfrey
Date: Mon Nov 7 18:20:01 2016
New Revision: 1768568
URL: http://svn.apache.org/viewvc?rev=1768568&view=rev
Log:
Limit process pending iteration to sessions which have been notified that they have work
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1768568&r1=1768567&r2=1768568&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Mon Nov 7 18:20:01 2016
@@ -122,12 +122,31 @@ public abstract class AbstractSystemMess
{
_name = consumerName;
_target = target;
+ _target.addStateListener(new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
+ {
+ @Override
+ public void stateChanged(final ConsumerTarget object,
+ final ConsumerTarget.State oldState,
+ final ConsumerTarget.State newState)
+ {
+ if(newState == ConsumerTarget.State.ACTIVE)
+ {
+ if(!_queue.isEmpty())
+ {
+ _target.notifyWork();
+ }
+ }
+ }
+ });
}
@Override
public void externalStateChange()
{
-
+ if(!_queue.isEmpty())
+ {
+ _target.notifyWork();
+ }
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java?rev=1768568&r1=1768567&r2=1768568&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java Mon Nov 7 18:20:01 2016
@@ -66,10 +66,10 @@ public class VirtualHostPropertiesNode e
Map<String, Object> headers = new HashMap<>();
- final List<String> globalAddresseDomains = _addressSpace.getGlobalAddressDomains();
- if (globalAddresseDomains != null && !globalAddresseDomains.isEmpty())
+ final List<String> globalAddressDomains = _addressSpace.getGlobalAddressDomains();
+ if (globalAddressDomains != null && !globalAddressDomains.isEmpty())
{
- String primaryDomain = globalAddresseDomains.get(0);
+ String primaryDomain = globalAddressDomains.get(0);
if(primaryDomain != null)
{
primaryDomain = primaryDomain.trim();
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1768568&r1=1768567&r2=1768568&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Mon Nov 7 18:20:01 2016
@@ -27,6 +27,8 @@ import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -68,6 +70,9 @@ public class AMQPConnection_0_10 extends
private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
private ServerDisassembler _disassembler;
+ private final Set<AMQSessionModel<?>> _sessionsWithWork =
+ Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?>, Boolean>());
+
public AMQPConnection_0_10(final Broker<?> broker,
ServerNetworkConnection network,
@@ -251,7 +256,7 @@ public class AMQPConnection_0_10 extends
{
if (isIOThread())
{
- return _connection.processPendingIterator();
+ return _connection.processPendingIterator(_sessionsWithWork);
}
else
{
@@ -280,6 +285,7 @@ public class AMQPConnection_0_10 extends
@Override
public void notifyWork(final AMQSessionModel<?> sessionModel)
{
+ _sessionsWithWork.add(sessionModel);
notifyWork();
}
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1768568&r1=1768567&r2=1768568&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Mon Nov 7 18:20:01 2016
@@ -27,12 +27,12 @@ import java.security.AccessControlContex
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
@@ -42,13 +42,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -491,18 +491,18 @@ public class ServerConnection extends Co
}
}
- public Iterator<Runnable> processPendingIterator()
+ public Iterator<Runnable> processPendingIterator(final Set<AMQSessionModel<?>> sessionsWithWork)
{
- return new ProcessPendingIterator();
+ return new ProcessPendingIterator(sessionsWithWork);
}
private class ProcessPendingIterator implements Iterator<Runnable>
{
- private final Collection<? extends ServerSession> _sessionsWithPending;
+ private final Collection<AMQSessionModel<?>> _sessionsWithPending;
private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
- private ProcessPendingIterator()
+ private ProcessPendingIterator(final Set<AMQSessionModel<?>> sessionsWithWork)
{
- _sessionsWithPending = new ArrayList<>(getSessionModels());
+ _sessionsWithPending = sessionsWithWork;
_sessionIterator = _sessionsWithPending.iterator();
}
@@ -527,9 +527,10 @@ public class ServerConnection extends Co
@Override
public void run()
{
- if(!session.processPending())
+ _sessionIterator.remove();
+ if(session.processPending())
{
- _sessionIterator.remove();
+ _sessionsWithPending.add(session);
}
}
};
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java?rev=1768568&r1=1768567&r2=1768568&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java Mon Nov 7 18:20:01 2016
@@ -30,7 +30,6 @@ import java.security.AccessControlExcept
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -165,6 +164,10 @@ public class AMQPConnection_0_8Impl
private volatile boolean _transportBlockedForWriting;
private volatile SubjectAuthenticationResult _successfulAuthenticationResult;
+ private final Set<AMQSessionModel<?>> _sessionsWithWork =
+ Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?>, Boolean>());
+
+
public AMQPConnection_0_8Impl(Broker<?> broker,
ServerNetworkConnection network,
AmqpPort<?> port,
@@ -1395,6 +1398,7 @@ public class AMQPConnection_0_8Impl
@Override
public void notifyWork(final AMQSessionModel<?> sessionModel)
{
+ _sessionsWithWork.add(sessionModel);
notifyWork();
}
@@ -1422,28 +1426,27 @@ public class AMQPConnection_0_8Impl
private class ProcessPendingIterator implements Iterator<Runnable>
{
- private final Collection<? extends AMQChannel> _sessionsWithPending;
private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
+
private ProcessPendingIterator()
{
- _sessionsWithPending = new ArrayList<>(getSessionModels());
- _sessionIterator = _sessionsWithPending.iterator();
+ _sessionIterator = _sessionsWithWork.iterator();
}
@Override
public boolean hasNext()
{
- return !(_sessionsWithPending.isEmpty() && _asyncTaskList.isEmpty());
+ return !(_sessionsWithWork.isEmpty() && _asyncTaskList.isEmpty());
}
@Override
public Runnable next()
{
- if(!_sessionsWithPending.isEmpty())
+ if(!_sessionsWithWork.isEmpty())
{
if(!_sessionIterator.hasNext())
{
- _sessionIterator = _sessionsWithPending.iterator();
+ _sessionIterator = _sessionsWithWork.iterator();
}
final AMQSessionModel<?> session = _sessionIterator.next();
return new Runnable()
@@ -1451,9 +1454,11 @@ public class AMQPConnection_0_8Impl
@Override
public void run()
{
- if(!session.processPending())
+ _sessionIterator.remove();
+
+ if(session.processPending())
{
- _sessionIterator.remove();
+ _sessionsWithWork.add(session);
}
}
};
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1768568&r1=1768567&r2=1768568&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Mon Nov 7 18:20:01 2016
@@ -36,6 +36,8 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -233,7 +235,8 @@ public class AMQPConnection_1_0 extends
private boolean _closedOnOpen;
-
+ private final Set<AMQSessionModel<?>> _sessionsWithWork =
+ Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?>, Boolean>());
AMQPConnection_1_0(final Broker<?> broker,
final ServerNetworkConnection network,
@@ -1379,6 +1382,7 @@ public class AMQPConnection_1_0 extends
@Override
public void notifyWork(final AMQSessionModel<?> sessionModel)
{
+ _sessionsWithWork.add(sessionModel);
notifyWork();
}
@@ -1539,28 +1543,26 @@ public class AMQPConnection_1_0 extends
private class ProcessPendingIterator implements Iterator<Runnable>
{
- private final Collection<? extends AMQSessionModel<?>> _sessionsWithPending;
private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
private ProcessPendingIterator()
{
- _sessionsWithPending = new ArrayList<>(getSessionModels());
- _sessionIterator = _sessionsWithPending.iterator();
+ _sessionIterator = _sessionsWithWork.iterator();
}
@Override
public boolean hasNext()
{
- return !(_sessionsWithPending.isEmpty() && _asyncTaskList.isEmpty());
+ return !(_sessionsWithWork.isEmpty() && _asyncTaskList.isEmpty());
}
@Override
public Runnable next()
{
- if(!_sessionsWithPending.isEmpty())
+ if(!_sessionsWithWork.isEmpty())
{
if(!_sessionIterator.hasNext())
{
- _sessionIterator = _sessionsWithPending.iterator();
+ _sessionIterator = _sessionsWithWork.iterator();
}
final AMQSessionModel<?> session = _sessionIterator.next();
return new Runnable()
@@ -1568,9 +1570,10 @@ public class AMQPConnection_1_0 extends
@Override
public void run()
{
- if(!session.processPending())
+ _sessionIterator.remove();
+ if(session.processPending())
{
- _sessionIterator.remove();
+ _sessionsWithWork.add(session);
}
}
};
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1768568&r1=1768567&r2=1768568&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Mon Nov 7 18:20:01 2016
@@ -61,7 +61,10 @@ class ManagementNodeConsumer implements
@Override
public void externalStateChange()
{
-
+ if(!_queue.isEmpty())
+ {
+ _target.notifyWork();
+ }
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org