You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/01/24 13:46:13 UTC
svn commit: r1780077 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/session/
broker-core/src/main/java/org/apache/qpid/server/transport/
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
bro...
Author: kwall
Date: Tue Jan 24 13:46:13 2017
New Revision: 1780077
URL: http://svn.apache.org/viewvc?rev=1780077&view=rev
Log:
QPID-7633: Pull up the processPendingIterator and associated methods.
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java?rev=1780077&r1=1780076&r2=1780077&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java Tue Jan 24 13:46:13 2017
@@ -25,8 +25,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.security.auth.Subject;
@@ -36,7 +38,9 @@ import com.google.common.util.concurrent
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.logging.LogSubject;
@@ -76,10 +80,15 @@ public abstract class AbstractAMQPSessio
protected final SecurityToken _token;
protected final PublishAuthorisationCache _publishAuthCache;
+ protected final long _maxUncommittedInMemorySize;
+
protected final LogSubject _logSubject;
protected final List<Action<? super S>> _taskList = new CopyOnWriteArrayList<>();
+ protected final Set<AbstractConsumerTarget> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
+ private Iterator<AbstractConsumerTarget> _processPendingIterator;
+
protected AbstractAMQPSession(final Connection<?> parent, final int sessionId)
{
super(parent, createAttributes(sessionId));
@@ -115,6 +124,9 @@ public abstract class AbstractAMQPSessio
final long authCacheTimeout = _connection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT);
final int authCacheSize = _connection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE);
_publishAuthCache = new PublishAuthorisationCache(_token, authCacheTimeout, authCacheSize);
+
+ _maxUncommittedInMemorySize = _connection.getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+
_logSubject = new ChannelLogSubject(this);
setState(State.ACTIVE);
@@ -349,9 +361,19 @@ public abstract class AbstractAMQPSessio
}
}
- public abstract void addTicker(final Ticker ticker);
+ @Override
+ public void addTicker(final Ticker ticker)
+ {
+ _connection.getAggregateTicker().addTicker(ticker);
+ // trigger a wakeup to ensure the ticker will be taken into account
+ getAMQPConnection().notifyWork();
+ }
- public abstract void removeTicker(final Ticker ticker);
+ @Override
+ public void removeTicker(final Ticker ticker)
+ {
+ _connection.getAggregateTicker().removeTicker(ticker);
+ }
public abstract void doTimeoutAction(final String idleTransactionTimeoutError);
@@ -364,11 +386,54 @@ public abstract class AbstractAMQPSessio
public abstract long getTransactionStartTimeLong();
-
@Override
protected void logOperation(final String operation)
{
getEventLogger().message(ChannelMessages.OPERATION(operation));
}
+ @Override
+ public boolean processPending()
+ {
+ if (!getAMQPConnection().isIOThread() || isClosing())
+ {
+ return false;
+ }
+
+ updateBlockedStateIfNecessary();
+
+ if(!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
+ {
+ if (_processPendingIterator == null || !_processPendingIterator.hasNext())
+ {
+ _processPendingIterator = _consumersWithPendingWork.iterator();
+ }
+
+ if(_processPendingIterator.hasNext())
+ {
+ AbstractConsumerTarget target = _processPendingIterator.next();
+ _processPendingIterator.remove();
+ if (target.processPending())
+ {
+ _consumersWithPendingWork.add(target);
+ }
+ }
+ }
+
+ return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
+ }
+
+ public void notifyWork(final X target)
+ {
+ if(_consumersWithPendingWork.add((AbstractConsumerTarget) target))
+ {
+ getAMQPConnection().notifyWork(this);
+ }
+ }
+
+ public abstract void transportStateChanged();
+
+ protected abstract void updateBlockedStateIfNecessary();
+
+ public abstract boolean isClosing();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1780077&r1=1780076&r2=1780077&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Tue Jan 24 13:46:13 2017
@@ -75,6 +75,7 @@ public interface AMQPConnection<C extend
boolean hasSessionWithName(byte[] name);
+ AggregateTicker getAggregateTicker();
enum CloseReason
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1780077&r1=1780076&r2=1780077&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Tue Jan 24 13:46:13 2017
@@ -482,7 +482,7 @@ public class ServerConnection extends Co
{
for (ServerSession ssn : getSessionModels())
{
- ssn.transportStateChanged();
+ ssn.getModelObject().transportStateChanged();
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1780077&r1=1780076&r2=1780077&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Tue Jan 24 13:46:13 2017
@@ -54,11 +54,9 @@ import com.google.common.util.concurrent
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
-import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
@@ -93,10 +91,8 @@ import org.apache.qpid.server.txn.Suspen
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.*;
-import org.apache.qpid.transport.network.Ticker;
public class ServerSession extends Session
implements LogSubject, AsyncAutoCommitTransaction.FutureRecorder
@@ -125,8 +121,6 @@ public class ServerSession extends Sessi
private long _blockTime;
private long _blockingTimeout;
private boolean _wireBlockingState;
- private final Set<ConsumerTarget_0_10> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
- private Iterator<ConsumerTarget_0_10> _processPendingIterator;
public static interface MessageDispositionChangeListener
{
@@ -154,13 +148,10 @@ public class ServerSession extends Sessi
private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_10>> _consumers = new CopyOnWriteArrayList<>();
- private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>();
-
private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();
private volatile long _uncommittedMessageSize;
private final List<StoredMessage<MessageMetaData_0_10>> _uncommittedMessages = new ArrayList<>();
- private long _maxUncommittedInMemorySize;
public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
@@ -170,7 +161,6 @@ public class ServerSession extends Sessi
ServerConnection serverConnection = (ServerConnection) connection;
_blockingTimeout = serverConnection.getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
- _maxUncommittedInMemorySize = getAMQPConnection().getContextProvider().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
}
public AccessControlContext getAccessControllerContext()
@@ -832,16 +822,24 @@ public class ServerSession extends Sessi
return b;
}
- public void transportStateChanged()
+ public void updateBlockedStateIfNecesssary()
{
- for(ConsumerTarget_0_10 consumerTarget : getSubscriptions())
- {
- consumerTarget.transportStateChanged();
- }
- if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
+ boolean desiredBlockingState = _blocking.get();
+ if (desiredBlockingState != _wireBlockingState)
{
- getAMQPConnection().notifyWork(_modelObject);
+ _wireBlockingState = desiredBlockingState;
+
+ if (desiredBlockingState)
+ {
+ invokeBlock();
+ }
+ else
+ {
+ invokeUnblock();
+ }
+ _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
}
+
}
public Object getConnectionReference()
@@ -1100,70 +1098,6 @@ public class ServerSession extends Sessi
}
}
- public boolean processPending()
- {
- if (!getAMQPConnection().isIOThread() || isClosing())
- {
- return false;
- }
-
- boolean desiredBlockingState = _blocking.get();
- if (desiredBlockingState != _wireBlockingState)
- {
- _wireBlockingState = desiredBlockingState;
-
- if (desiredBlockingState)
- {
- invokeBlock();
- }
- else
- {
- invokeUnblock();
- }
- _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
- }
-
- if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
- {
- if (_processPendingIterator == null || !_processPendingIterator.hasNext())
- {
- _processPendingIterator = _consumersWithPendingWork.iterator();
- }
-
- if (_processPendingIterator.hasNext())
- {
- ConsumerTarget_0_10 target = _processPendingIterator.next();
- _processPendingIterator.remove();
- if (target.processPending())
- {
- _consumersWithPendingWork.add(target);
- }
- }
- }
-
- return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
- }
-
- public void addTicker(final Ticker ticker)
- {
- getAMQPConnection().getAggregateTicker().addTicker(ticker);
- // trigger a wakeup to ensure the ticker will be taken into account
- getAMQPConnection().notifyWork();
- }
-
- public void removeTicker(final Ticker ticker)
- {
- getAMQPConnection().getAggregateTicker().removeTicker(ticker);
- }
-
- public void notifyWork(final ConsumerTarget_0_10 target)
- {
- if(_consumersWithPendingWork.add(target))
- {
- getAMQPConnection().notifyWork(_modelObject);
- }
- }
-
public void doTimeoutAction(final String reason)
{
getAMQPConnection().closeSessionAsync(_modelObject,
@@ -1172,7 +1106,7 @@ public class ServerSession extends Sessi
public final long getMaxUncommittedInMemorySize()
{
- return _maxUncommittedInMemorySize;
+ return _modelObject.getMaxUncommittedInMemorySize();
}
public int compareTo(AMQSessionModel o)
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java?rev=1780077&r1=1780076&r2=1780077&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java Tue Jan 24 13:46:13 2017
@@ -35,9 +35,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.PublishAuthorisationCache;
import org.apache.qpid.server.session.AbstractAMQPSession;
-import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.transport.network.Ticker;
public class Session_0_10 extends AbstractAMQPSession<Session_0_10, ConsumerTarget_0_10>
implements AMQSessionModel<Session_0_10, ConsumerTarget_0_10>, LogSubject
@@ -115,19 +113,20 @@ public class Session_0_10 extends Abstra
@Override
public void transportStateChanged()
{
- _serverSession.transportStateChanged();
+ for(ConsumerTarget_0_10 consumerTarget : _serverSession.getSubscriptions())
+ {
+ consumerTarget.transportStateChanged();
+ }
+ if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
+ {
+ getAMQPConnection().notifyWork(this);
+ }
}
@Override
- public boolean processPending()
+ protected void updateBlockedStateIfNecessary()
{
- return _serverSession.processPending();
- }
-
- @Override
- public void notifyWork(final ConsumerTarget_0_10 target)
- {
- _serverSession.notifyWork(target);
+ _serverSession.updateBlockedStateIfNecesssary();
}
@Override
@@ -179,18 +178,6 @@ public class Session_0_10 extends Abstra
}
@Override
- public void addTicker(final Ticker ticker)
- {
- _serverSession.addTicker(ticker);
- }
-
- @Override
- public void removeTicker(final Ticker ticker)
- {
- _serverSession.removeTicker(ticker);
- }
-
- @Override
public void doTimeoutAction(final String idleTransactionTimeoutError)
{
_serverSession.doTimeoutAction(idleTransactionTimeoutError);
@@ -242,4 +229,9 @@ public class Session_0_10 extends Abstra
{
return _serverSession;
}
+
+ public long getMaxUncommittedInMemorySize()
+ {
+ return _maxUncommittedInMemorySize;
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1780077&r1=1780076&r2=1780077&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Jan 24 13:46:13 2017
@@ -32,7 +32,6 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -60,20 +59,16 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
-import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.ArrivalTimeFilter;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.logging.LogMessage;
-import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
-import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
@@ -103,7 +98,6 @@ import org.apache.qpid.server.util.Serve
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
-import org.apache.qpid.transport.network.Ticker;
public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0_8>
implements AMQSessionModel<AMQChannel, ConsumerTarget_0_8>,
@@ -155,10 +149,6 @@ public class AMQChannel extends Abstract
/** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
- private final Set<ConsumerTarget_0_8> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
-
- private Iterator<ConsumerTarget_0_8> _processPendingIterator;
-
private final MessageStore _messageStore;
private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
@@ -205,7 +195,6 @@ public class AMQChannel extends Abstract
private long _confirmedMessageCounter;
private volatile long _uncommittedMessageSize;
private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>();
- private long _maxUncommittedInMemorySize;
private boolean _wireBlockingState;
@@ -231,8 +220,6 @@ public class AMQChannel extends Abstract
_connection = connection;
_channelId = channelId;
-
- _maxUncommittedInMemorySize = connection.getContextProvider().getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
_messageStore = messageStore;
_blockingTimeout = connection.getBroker().getContextValue(Long.class,
Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
@@ -1294,7 +1281,8 @@ public class AMQChannel extends Abstract
return "("+ _suspended.get() + ", " + _closing.get() + ", " + _connection.isClosing() + ") "+"["+ _connection.toString()+":"+_channelId+"]";
}
- boolean isClosing()
+ @Override
+ public boolean isClosing()
{
return _closing.get();
}
@@ -3666,14 +3654,8 @@ public class AMQChannel extends Abstract
}
}
- @Override
- public boolean processPending()
+ protected void updateBlockedStateIfNecessary()
{
- if (!getAMQPConnection().isIOThread() || isClosing())
- {
- return false;
- }
-
boolean desiredBlockingState = _blocking.get();
if (desiredBlockingState != _wireBlockingState)
{
@@ -3681,49 +3663,6 @@ public class AMQChannel extends Abstract
sendFlow(!desiredBlockingState);
_blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
}
-
- if(!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
- {
- if (_processPendingIterator == null || !_processPendingIterator.hasNext())
- {
- _processPendingIterator = _consumersWithPendingWork.iterator();
- }
-
- if(_processPendingIterator.hasNext())
- {
- ConsumerTarget_0_8 target = _processPendingIterator.next();
- _processPendingIterator.remove();
- if (target.processPending())
- {
- _consumersWithPendingWork.add(target);
- }
- }
- }
-
- return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
- }
-
- @Override
- public void addTicker(final Ticker ticker)
- {
- getConnection().getAggregateTicker().addTicker(ticker);
- // trigger a wakeup to ensure the ticker will be taken into account
- getAMQPConnection().notifyWork();
- }
-
- @Override
- public void removeTicker(final Ticker ticker)
- {
- getConnection().getAggregateTicker().removeTicker(ticker);
- }
-
- @Override
- public void notifyWork(final ConsumerTarget_0_8 target)
- {
- if(_consumersWithPendingWork.add(target))
- {
- getAMQPConnection().notifyWork(this);
- }
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java?rev=1780077&r1=1780076&r2=1780077&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java Tue Jan 24 13:46:13 2017
@@ -123,6 +123,7 @@ public class AMQChannelTest extends Qpid
when(_amqConnection.getContextProvider()).thenReturn(_virtualHost);
when(_amqConnection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
when(_amqConnection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
+ when(_amqConnection.getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE)).thenReturn(Connection.DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE);
when(_amqConnection.getTaskExecutor()).thenReturn(taskExecutor);
when(_amqConnection.getChildExecutor()).thenReturn(taskExecutor);
when(_amqConnection.getModel()).thenReturn(BrokerModel.getInstance());
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1780077&r1=1780076&r2=1780077&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Tue Jan 24 13:46:13 2017
@@ -125,8 +125,11 @@ import org.apache.qpid.server.txn.AutoCo
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+<<<<<<< HEAD
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.transport.network.Ticker;
+=======
+>>>>>>> QPID-7633: Pull up the processPendingIterator and associated methods.
public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget_1_0>
implements AMQSessionModel<Session_1_0, ConsumerTarget_1_0>, LogSubject
@@ -149,8 +152,6 @@ public class Session_1_0 extends Abstrac
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private Session<?> _modelObject = this;
- private final Set<ConsumerTarget_1_0> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
- private Iterator<ConsumerTarget_1_0> _processPendingIterator;
private SessionState _sessionState;
@@ -1920,56 +1921,15 @@ public class Session_1_0 extends Abstrac
}
@Override
- public boolean processPending()
+ protected void updateBlockedStateIfNecessary()
{
- if (!getAMQPConnection().isIOThread() || END_STATES.contains(getSessionState()))
- {
- return false;
- }
-
-
- if(!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
- {
- if (_processPendingIterator == null || !_processPendingIterator.hasNext())
- {
- _processPendingIterator = _consumersWithPendingWork.iterator();
- }
-
- if(_processPendingIterator.hasNext())
- {
- ConsumerTarget_1_0 target = _processPendingIterator.next();
- _processPendingIterator.remove();
- if (target.processPending())
- {
- _consumersWithPendingWork.add(target);
- }
- }
- }
-
- return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
- }
-
- @Override
- public void addTicker(final Ticker ticker)
- {
- getConnection().getAggregateTicker().addTicker(ticker);
- // trigger a wakeup to ensure the ticker will be taken into account
- getAMQPConnection().notifyWork();
- }
- @Override
- public void removeTicker(final Ticker ticker)
- {
- getConnection().getAggregateTicker().removeTicker(ticker);
}
@Override
- public void notifyWork(final ConsumerTarget_1_0 target)
+ public boolean isClosing()
{
- if(_consumersWithPendingWork.add(target))
- {
- getAMQPConnection().notifyWork(this);
- }
+ return END_STATES.contains(getSessionState());
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org