You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/03/21 17:37:34 UTC
svn commit: r1788012 - in
/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10:
ServerConnectionDelegate.java ServerSession.java
Author: kwall
Date: Tue Mar 21 17:37:33 2017
New Revision: 1788012
URL: http://svn.apache.org/viewvc?rev=1788012&view=rev
Log:
QPID-7622: [0-10] Fix method ordering in ServerConnectionDelegate/ServerSession. No functional change.
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1788012&r1=1788011&r2=1788012&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Tue Mar 21 17:37:33 2017
@@ -67,6 +67,40 @@ public class ServerConnectionDelegate ex
private boolean _compressionSupported;
private volatile SaslNegotiator _saslNegotiator;
+ enum ConnectionState
+ {
+ INIT,
+ AWAIT_START_OK,
+ AWAIT_SECURE_OK,
+ AWAIT_TUNE_OK,
+ AWAIT_OPEN,
+ OPEN
+ }
+
+ private volatile ConnectionState _state = ConnectionState.INIT;
+ private volatile SubjectAuthenticationResult _successfulAuthenticationResult;
+
+
+ public ServerConnectionDelegate(Broker<?> broker, SubjectCreator subjectCreator)
+ {
+ this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, subjectCreator);
+ }
+
+ private ServerConnectionDelegate(Map<String, Object> properties,
+ List<Object> locales,
+ Broker<?> broker,
+ SubjectCreator subjectCreator)
+ {
+ _clientProperties = properties;
+ _mechanisms = (List) subjectCreator.getMechanisms();
+ _locales = locales;
+
+ _broker = broker;
+ _maxNoOfChannels = broker.getConnection_sessionCountLimit();
+ _subjectCreator = subjectCreator;
+ _maximumFrameSize = Math.min(0xffff, broker.getNetworkBufferSize());
+ }
+
public void control(ServerConnection conn, Method method)
{
method.dispatch(conn, this);
@@ -119,39 +153,6 @@ public class ServerConnectionDelegate ex
connection.doHeartBeat();
}
- enum ConnectionState
- {
- INIT,
- AWAIT_START_OK,
- AWAIT_SECURE_OK,
- AWAIT_TUNE_OK,
- AWAIT_OPEN,
- OPEN
- }
-
- private volatile ConnectionState _state = ConnectionState.INIT;
- private volatile SubjectAuthenticationResult _successfulAuthenticationResult;
-
-
- public ServerConnectionDelegate(Broker<?> broker, SubjectCreator subjectCreator)
- {
- this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, subjectCreator);
- }
-
- private ServerConnectionDelegate(Map<String, Object> properties,
- List<Object> locales,
- Broker<?> broker,
- SubjectCreator subjectCreator)
- {
- _clientProperties = properties;
- _mechanisms = (List) subjectCreator.getMechanisms();
- _locales = locales;
-
- _broker = broker;
- _maxNoOfChannels = broker.getConnection_sessionCountLimit();
- _subjectCreator = subjectCreator;
- _maximumFrameSize = Math.min(0xffff, broker.getNetworkBufferSize());
- }
public final ConnectionState getState()
@@ -159,7 +160,6 @@ public class ServerConnectionDelegate ex
return _state;
}
-
private void assertState(final ServerConnection conn, final ConnectionState requiredState)
{
if(_state != requiredState)
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1788012&r1=1788011&r2=1788012&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Tue Mar 21 17:37:33 2017
@@ -107,7 +107,6 @@ public class ServerSession extends Sessi
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
- private final long timeout = 60000; // TODO server side close does not require this
// completed incoming commands
private final Object processedLock = new Object();
private final int commandLimit = Integer.getInteger("qpid.session.command_limit", 64 * 1024);
@@ -145,6 +144,39 @@ public class ServerSession extends Sessi
private Map<Integer,ResultFuture<?>> results = new HashMap<Integer,ResultFuture<?>>();
private org.apache.qpid.server.protocol.v0_10.transport.ExecutionException exception = null;
+ private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
+ new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
+
+ private ServerTransaction _transaction;
+ private final AtomicLong _txnStarts = new AtomicLong(0);
+ private final AtomicLong _txnCommits = new AtomicLong(0);
+ private final AtomicLong _txnRejects = new AtomicLong(0);
+
+ private final AtomicLong _txnCount = new AtomicLong(0);
+ private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
+
+ private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_10>> _consumers = new CopyOnWriteArrayList<>();
+
+ private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();
+ private volatile long _uncommittedMessageSize;
+
+ private final List<StoredMessage<MessageMetaData_0_10>> _uncommittedMessages = new ArrayList<>();
+
+ public ServerSession(ServerConnection connection, ServerSessionDelegate delegate, Binary name, long expiry)
+ {
+ this.connection = connection;
+ this.delegate = delegate;
+ this.name = name;
+ this.closing = false;
+ this._isNoReplay = false;
+ initReceiver();
+ _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
+
+ ServerConnection serverConnection = (ServerConnection) connection;
+
+ _blockingTimeout = serverConnection.getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
+ }
+
public Binary getName()
{
return name;
@@ -863,39 +895,6 @@ public class ServerSession extends Sessi
void performAction(MessageDispositionChangeListener listener);
}
- private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
- new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
-
- private ServerTransaction _transaction;
- private final AtomicLong _txnStarts = new AtomicLong(0);
- private final AtomicLong _txnCommits = new AtomicLong(0);
- private final AtomicLong _txnRejects = new AtomicLong(0);
-
- private final AtomicLong _txnCount = new AtomicLong(0);
- private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
-
- private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_10>> _consumers = new CopyOnWriteArrayList<>();
-
- private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();
- private volatile long _uncommittedMessageSize;
-
- private final List<StoredMessage<MessageMetaData_0_10>> _uncommittedMessages = new ArrayList<>();
-
- public ServerSession(ServerConnection connection, ServerSessionDelegate delegate, Binary name, long expiry)
- {
- this.connection = connection;
- this.delegate = delegate;
- this.name = name;
- this.closing = false;
- this._isNoReplay = false;
- initReceiver();
- _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
-
- ServerConnection serverConnection = (ServerConnection) connection;
-
- _blockingTimeout = serverConnection.getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
- }
-
public Subject getSubject()
{
return _modelObject.getSubject();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org