You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/07/13 11:47:58 UTC
qpid-broker-j git commit: QPID-7856: Move connection related
attributes, currently on Broker,
to AMQ[Java Broker] Convert broker connection related attributes into context
variables and expose them on amqp port as derived attributesP port
Repository: qpid-broker-j
Updated Branches:
refs/heads/master d63cab001 -> 8d574dc58
QPID-7856: Move connection related attributes, currently on Broker, to AMQ[Java Broker] Convert broker connection related attributes into context variables and expose them on amqp port as derived attributesP port
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/8d574dc5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/8d574dc5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/8d574dc5
Branch: refs/heads/master
Commit: 8d574dc58da8e53b66ec2b296665e214dc458e39
Parents: d63cab0
Author: Alex Rudyy <or...@apache.org>
Authored: Thu Jul 13 12:19:28 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Jul 13 12:19:28 2017 +0100
----------------------------------------------------------------------
.../org/apache/qpid/server/model/Broker.java | 16 ------
.../apache/qpid/server/model/BrokerImpl.java | 27 +--------
.../apache/qpid/server/model/Connection.java | 2 +-
.../org/apache/qpid/server/model/Session.java | 1 -
.../apache/qpid/server/model/port/AmqpPort.java | 37 ++++++++++++
.../qpid/server/model/port/AmqpPortImpl.java | 25 ++++++++
.../store/BrokerStoreUpgraderAndRecoverer.java | 35 ++++++++++++
.../qpid/server/transport/AMQPConnection.java | 4 +-
.../StoreConfigurationChangeListenerTest.java | 4 +-
.../qpid/server/model/BrokerTestHelper.java | 2 -
.../qpid/server/store/BrokerRecovererTest.java | 2 -
.../BrokerStoreUpgraderAndRecovererTest.java | 31 +++++++++-
.../protocol/v0_10/AMQPConnection_0_10.java | 5 ++
.../protocol/v0_10/AMQPConnection_0_10Impl.java | 6 ++
.../server/protocol/v0_10/ServerConnection.java | 17 +++++-
.../v0_10/ServerConnectionDelegate.java | 10 ++--
.../protocol/v0_10/ServerSessionDelegate.java | 2 +-
.../protocol/v0_10/ServerSessionTest.java | 3 +-
.../protocol/v0_8/AMQPConnection_0_8.java | 4 ++
.../protocol/v0_8/AMQPConnection_0_8Impl.java | 42 +++++++-------
.../protocol/v0_8/AMQPConnection_0_8Test.java | 2 +-
.../protocol/v1_0/AMQPConnection_1_0.java | 8 +++
.../protocol/v1_0/AMQPConnection_1_0Impl.java | 60 +++++++++-----------
.../src/main/java/resources/editBroker.html | 33 +----------
.../java/resources/js/qpid/management/Broker.js | 4 +-
.../resources/js/qpid/management/Connection.js | 4 ++
.../resources/js/qpid/management/editBroker.js | 4 +-
.../src/main/java/resources/showBroker.html | 12 ----
.../src/main/java/resources/showConnection.html | 4 ++
.../org/apache/qpid/client/HeartbeatTest.java | 10 +++-
.../qpid/systest/rest/BrokerRestTest.java | 4 --
.../qpid/systest/rest/acl/BrokerACLTest.java | 38 ++++++-------
.../ImmediateAndMandatoryPublishingTest.java | 19 +++++--
.../unit/transacted/TransactionTimeoutTest.java | 18 +++++-
34 files changed, 291 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 98523c3..d9cd370 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -53,11 +53,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
String PREFERENCE_STORE_ATTRIBUTES = "preferenceStoreAttributes";
String CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT = "channel.flowControlEnforcementTimeout";
-
- String CONNECTION_SESSION_COUNT_LIMIT = "connection.sessionCountLimit";
- String CONNECTION_HEART_BEAT_DELAY = "connection.heartBeatDelay";
- String CONNECTION_CLOSE_WHEN_NO_ROUTE = "connection.closeWhenNoRoute";
-
String BROKER_FLOW_TO_DISK_THRESHOLD = "broker.flowToDiskThreshold";
String BROKER_FAIL_STARTUP_WITH_ERRORED_CHILD = "broker.failStartupWithErroredChild";
@@ -142,17 +137,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@DerivedAttribute
int getNumberOfCores();
- @ManagedAttribute( defaultValue = "256" )
- int getConnection_sessionCountLimit();
-
- @ManagedAttribute( defaultValue = "0", description = "The default frequency with which Broker and client will exchange heartbeat messages (in seconds). "
- + "Clients may negotiate a different heartbeat frequency or disable it altogether. "
- + "A value of 0 disables heart beating.")
- int getConnection_heartBeatDelay();
-
- @ManagedAttribute( defaultValue = "true" )
- boolean getConnection_closeWhenNoRoute();
-
@ManagedAttribute( defaultValue = "0", description = "Period (in seconds) of the statistic report.")
int getStatisticsReportingPeriod();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index c14289f..788f0b9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -116,8 +116,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
private final BrokerPrincipal _principal;
- private String[] POSITIVE_NUMERIC_ATTRIBUTES = { CONNECTION_SESSION_COUNT_LIMIT,
- CONNECTION_HEART_BEAT_DELAY, STATISTICS_REPORTING_PERIOD };
+ private String[] POSITIVE_NUMERIC_ATTRIBUTES = { STATISTICS_REPORTING_PERIOD };
private AuthenticationProvider<?> _managementModeAuthenticationProvider;
@@ -126,12 +125,6 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
@ManagedAttributeField
- private int _connection_sessionCountLimit;
- @ManagedAttributeField
- private int _connection_heartBeatDelay;
- @ManagedAttributeField
- private boolean _connection_closeWhenNoRoute;
- @ManagedAttributeField
private int _statisticsReportingPeriod;
@ManagedAttributeField
private boolean _statisticsReportingResetEnabled;
@@ -508,24 +501,6 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
}
@Override
- public int getConnection_sessionCountLimit()
- {
- return _connection_sessionCountLimit;
- }
-
- @Override
- public int getConnection_heartBeatDelay()
- {
- return _connection_heartBeatDelay;
- }
-
- @Override
- public boolean getConnection_closeWhenNoRoute()
- {
- return _connection_closeWhenNoRoute;
- }
-
- @Override
public int getStatisticsReportingPeriod()
{
return _statisticsReportingPeriod;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
index 5bd6aad..0f62a0b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
@@ -90,7 +90,7 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
@DerivedAttribute
String getRemoteProcessPid();
- @DerivedAttribute
+ @DerivedAttribute(description = "The actual negotiated value of session count limit")
int getSessionCountLimit();
@DerivedAttribute
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
index 271cf84..91f7cdb 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.model;
-import java.util.Collection;
import java.util.Date;
@ManagedObject( creatable = false, amqpName = "org.apache.qpid.Session")
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
index 44756cc..b75266f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
@@ -96,6 +96,24 @@ public interface AmqpPort<X extends AmqpPort<X>> extends Port<X>
String PROPERTY_DEFAULT_SUPPORTED_PROTOCOL_REPLY = "qpid.broker_default_supported_protocol_version_reply";
+ String CLOSE_WHEN_NO_ROUTE = "qpid.port.closeWhenNoRoute";
+
+ @SuppressWarnings("unused")
+ @ManagedContextDefault(name = CLOSE_WHEN_NO_ROUTE)
+ boolean DEFAULT_CONNECTION_CLOSE_WHEN_NO_ROUTE = true;
+
+ String SESSION_COUNT_LIMIT = "qpid.port.sessionCountLimit";
+
+ @SuppressWarnings("unused")
+ @ManagedContextDefault(name = SESSION_COUNT_LIMIT)
+ int DEFAULT_SESSION_COUNT_LIMIT = 256;
+
+ String HEART_BEAT_DELAY = "qpid.port.heartbeatDelay";
+
+ @SuppressWarnings("unused")
+ @ManagedContextDefault(name = HEART_BEAT_DELAY)
+ int DEFAULT_HEART_BEAT_DELAY = 0;
+
SSLContext getSSLContext();
@ManagedAttribute(defaultValue = "*")
@@ -138,6 +156,25 @@ public interface AmqpPort<X extends AmqpPort<X>> extends Port<X>
+ " the connection will be aborted.")
long getProtocolHandshakeTimeout();
+ @DerivedAttribute(description = "Controls behaviour when the Broker receives a message for which no destination exists"
+ + " or is otherwise rejected by the destination. For AMQP 0-8..0-91 the connection will"
+ + " be closed only if transactionally publishing a message with the mandatory flag"
+ + " set and the Publisher Confirms extension is disabled."
+ + " For 0-10, the session will be closed if publishing a message (without the discard"
+ + " unroutable flag). In all other cases, this flag has no effect.")
+ boolean getCloseWhenNoRoute();
+
+
+ @DerivedAttribute(description = "The maximum number of sessions which can exist concurrently on a connection." )
+ int getSessionCountLimit();
+
+ @DerivedAttribute(description = "For AMQP 0-8..0-10 the default period with which Broker and client will exchange"
+ + " heartbeat messages (in seconds). Clients may negotiate a different heartbeat"
+ + " frequency or disable it altogether."
+ + " For AMQP 1.0 this setting controls the incoming idle timeout only. A value of"
+ + " 0 disables.")
+ int getHeartbeatDelay();
+
boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress);
int incrementConnectionCount();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
index d928b27..15630b7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
@@ -103,6 +103,10 @@ public class AmqpPortImpl extends AbstractPort<AmqpPortImpl> implements AmqpPort
private volatile int _connectionWarnCount;
private volatile long _protocolHandshakeTimeout;
private volatile int _boundPort = -1;
+ private volatile boolean _closeWhenNoRoute;
+ private volatile int _sessionCountLimit;
+ private volatile int _heartBeatDelay;
+
@ManagedObjectFactoryConstructor
public AmqpPortImpl(Map<String, Object> attributes, Container<?> container)
@@ -188,6 +192,9 @@ public class AmqpPortImpl extends AbstractPort<AmqpPortImpl> implements AmqpPort
super.onOpen();
_protocolHandshakeTimeout = getContextValue(Long.class, AmqpPort.PROTOCOL_HANDSHAKE_TIMEOUT);
_connectionWarnCount = getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT);
+ _closeWhenNoRoute = getContextValue(Boolean.class, AmqpPort.CLOSE_WHEN_NO_ROUTE);
+ _sessionCountLimit = getContextValue(Integer.class, AmqpPort.SESSION_COUNT_LIMIT);
+ _heartBeatDelay = getContextValue(Integer.class, AmqpPort.HEART_BEAT_DELAY);
}
@Override
@@ -553,4 +560,22 @@ public class AmqpPortImpl extends AbstractPort<AmqpPortImpl> implements AmqpPort
{
return _protocolHandshakeTimeout;
}
+
+ @Override
+ public boolean getCloseWhenNoRoute()
+ {
+ return _closeWhenNoRoute;
+ }
+
+ @Override
+ public int getSessionCountLimit()
+ {
+ return _sessionCountLimit;
+ }
+
+ @Override
+ public int getHeartbeatDelay()
+ {
+ return _heartBeatDelay;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
index b736fef..a5d2344 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
@@ -552,6 +552,13 @@ public class BrokerStoreUpgraderAndRecoverer extends AbstractConfigurationStoreU
private class Upgrader_6_1_to_7_0 extends StoreUpgraderPhase
{
+ private Map<String,String> BROKER_ATTRIBUTES_MOVED_INTO_CONTEXT = new HashMap<>();
+ {
+ BROKER_ATTRIBUTES_MOVED_INTO_CONTEXT.put("connection.sessionCountLimit", "qpid.port.sessionCountLimit");
+ BROKER_ATTRIBUTES_MOVED_INTO_CONTEXT.put("connection.heartBeatDelay", "qpid.port.heartbeatDelay");
+ BROKER_ATTRIBUTES_MOVED_INTO_CONTEXT.put("connection.closeWhenNoRoute", "qpid.port.closeWhenNoRoute");
+ };
+
public Upgrader_6_1_to_7_0()
{
super("modelVersion", "6.1", "7.0");
@@ -562,6 +569,34 @@ public class BrokerStoreUpgraderAndRecoverer extends AbstractConfigurationStoreU
{
if (record.getType().equals("Broker"))
{
+ Map<String, Object> attributes = new HashMap<>(record.getAttributes());
+ Map<String, String> additionalContext = new HashMap<>();
+ for (String attributeName : BROKER_ATTRIBUTES_MOVED_INTO_CONTEXT.keySet())
+ {
+ Object value = attributes.remove(attributeName);
+ if (value != null)
+ {
+ additionalContext.put(BROKER_ATTRIBUTES_MOVED_INTO_CONTEXT.get(attributeName),
+ String.valueOf(value));
+ }
+ }
+
+ if (!additionalContext.isEmpty())
+ {
+ Map<String, String> newContext = new HashMap<>();
+ if (attributes.containsKey("context"))
+ {
+ newContext.putAll((Map<String, String>) attributes.get("context"));
+ }
+ newContext.putAll(additionalContext);
+ attributes.put("context", newContext);
+
+ record = new ConfiguredObjectRecordImpl(record.getId(),
+ record.getType(),
+ attributes,
+ record.getParents());
+ }
+
upgradeRootRecord(record);
}
else if (record.getType().equals("Port"))
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
index d5719c4..b7acc15 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
@@ -32,9 +32,9 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.txn.LocalTransaction;
-import org.apache.qpid.server.txn.TransactionObserver;
import org.apache.qpid.server.util.Deletable;
public interface AMQPConnection<C extends AMQPConnection<C>>
@@ -118,4 +118,6 @@ public interface AMQPConnection<C extends AMQPConnection<C>>
long getMaxMessageSize();
+ AmqpPort<?> getPort();
+
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java b/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
index 7280519..381ee30 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
@@ -88,8 +88,8 @@ public class StoreConfigurationChangeListenerTest extends QpidTestCase
Broker broker = mock(Broker.class);
when(broker.getCategoryClass()).thenReturn(Broker.class);
when(broker.isDurable()).thenReturn(true);
- _listener.attributeSet(broker, Broker.CONNECTION_SESSION_COUNT_LIMIT, null, 1);
- verify(_store).update(eq(false),any(ConfiguredObjectRecord.class));
+ _listener.attributeSet(broker, Broker.DESCRIPTION, null, "test description");
+ verify(_store).update(eq(false), any(ConfiguredObjectRecord.class));
}
public void testChildAddedWhereParentManagesChildStorage()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java b/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java
index 728465f..f1958ff 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java
@@ -134,8 +134,6 @@ public class BrokerTestHelper
when(systemConfig.getCategoryClass()).thenReturn(SystemConfig.class);
Broker broker = mockWithSystemPrincipalAndAccessControl(Broker.class, SYSTEM_PRINCIPAL, accessControl);
- when(broker.getConnection_sessionCountLimit()).thenReturn(1);
- when(broker.getConnection_closeWhenNoRoute()).thenReturn(false);
when(broker.getId()).thenReturn(UUID.randomUUID());
when(broker.getObjectFactory()).thenReturn(objectFactory);
when(broker.getModel()).thenReturn(objectFactory.getModel());
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java b/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java
index b3f9679..bdcc4c6 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java
@@ -103,8 +103,6 @@ public class BrokerRecovererTest extends QpidTestCase
{
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Broker.NAME, getName());
- attributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, 1000);
- attributes.put(Broker.CONNECTION_HEART_BEAT_DELAY, 2000);
attributes.put(Broker.STATISTICS_REPORTING_PERIOD, 4000);
attributes.put(Broker.STATISTICS_REPORTING_RESET_ENABLED, true);
attributes.put(Broker.MODEL_VERSION, BrokerModel.MODEL_VERSION);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java b/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java
index c15ea6e..357048e 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -57,6 +56,7 @@ public class BrokerStoreUpgraderAndRecovererTest extends QpidTestCase
private UUID _hostId;
private UUID _brokerId;
+ @Override
public void setUp() throws Exception
{
super.setUp();
@@ -819,6 +819,35 @@ public class BrokerStoreUpgraderAndRecovererTest extends QpidTestCase
"1000", upgradedContext.get("qpid.port.http.acceptBacklog"));
}
+ public void testBrokerConnectionAttributesRemoval() throws Exception
+ {
+ _brokerRecord.getAttributes().put("modelVersion", "6.1");
+ _brokerRecord.getAttributes().put("connection.sessionCountLimit", "512");
+ _brokerRecord.getAttributes().put("connection.heartBeatDelay", "300");
+ _brokerRecord.getAttributes().put("connection.closeWhenNoRoute", "false");
+
+ DurableConfigurationStore dcs = new DurableConfigurationStoreStub(_brokerRecord);
+ BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemConfig);
+ List<ConfiguredObjectRecord> records = upgrade(dcs, recoverer);
+
+ ConfiguredObjectRecord upgradedBroker = findRecordById(_brokerRecord.getId(), records);
+ assertNotNull("Upgraded broker record is not found", upgradedBroker);
+ Map<String, Object> upgradedAttributes = upgradedBroker.getAttributes();
+
+ final Map<String, String> expectedContext = new HashMap<>();
+ expectedContext.put("qpid.port.sessionCountLimit", "512");
+ expectedContext.put("qpid.port.heartbeatDelay", "300");
+ expectedContext.put("qpid.port.closeWhenNoRoute", "false");
+
+ Object upgradedContext = upgradedAttributes.get("context");
+ assertTrue("Unpexcted context", upgradedContext instanceof Map);
+ assertEquals("Unexpected context", expectedContext, new HashMap<>(((Map<String, String>) upgradedContext)));
+
+ assertFalse("Session count limit is not removed", upgradedAttributes.containsKey("connection.sessionCountLimit"));
+ assertFalse("Heart beat delay is not removed", upgradedAttributes.containsKey("connection.heartBeatDelay"));
+ assertFalse("Close when no route is not removed", upgradedAttributes.containsKey("conection.closeWhenNoRoute"));
+ }
+
private void assertModelVersionUpgraded(final List<ConfiguredObjectRecord> records)
{
ConfiguredObjectRecord upgradedBrokerRecord = findRecordById(_brokerRecord.getId(), records);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
index 5ec8c7b..f117f62 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
@@ -26,6 +26,7 @@ import javax.security.auth.Subject;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.model.ContextProvider;
+import org.apache.qpid.server.model.DerivedAttribute;
import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.transport.AMQPConnection;
@@ -57,4 +58,8 @@ public interface AMQPConnection_0_10<C extends AMQPConnection_0_10<C>> extends A
AccessControlContext getAccessControllerContext();
void performDeleteTasks();
+
+ @DerivedAttribute(description = "The actual negotiated value of heartbeat delay.")
+ int getHeartbeatDelay();
+
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
index 673cce8..9458b77 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
@@ -219,6 +219,12 @@ public class AMQPConnection_0_10Impl extends AbstractAMQPConnection<AMQPConnecti
}
@Override
+ public int getHeartbeatDelay()
+ {
+ return _connection.getHeartBeatDelay();
+ }
+
+ @Override
public void setTransportBlockedForWriting(final boolean blocked)
{
if(_transportBlockedForWriting != blocked)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 35d0cc4..11e2ab5 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -100,9 +100,10 @@ public class ServerConnection extends ConnectionInvoker
private ServerConnectionDelegate delegate;
private ProtocolEventSender sender;
private State state = NEW;
- private int channelMax = 1;
+ private int _channelMax = 1;
private String locale;
private SocketAddress _remoteAddress;
+ private int _heartBeatDelay;
public ServerConnection(final long connectionId,
Broker<?> broker,
@@ -544,12 +545,12 @@ public class ServerConnection extends ConnectionInvoker
public int getChannelMax()
{
- return channelMax;
+ return _channelMax;
}
protected void setChannelMax(int max)
{
- channelMax = max;
+ _channelMax = max;
}
private int map(ServerSession ssn)
@@ -744,6 +745,16 @@ public class ServerConnection extends ConnectionInvoker
super.connectionStart(clientProperties, mechanisms, locales, options);
}
+ public void setHeartBeatDelay(final int heartBeatDelay)
+ {
+ _heartBeatDelay = heartBeatDelay;
+ }
+
+ public int getHeartBeatDelay()
+ {
+ return _heartBeatDelay;
+ }
+
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
private class ProcessPendingIterator implements Iterator<Runnable>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index a2176b6..129e393 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -39,7 +39,6 @@ import org.apache.qpid.server.common.ServerPropertyNames;
import org.apache.qpid.server.configuration.CommonProperties;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.properties.ConnectionStartProperties;
import org.apache.qpid.server.protocol.v0_10.transport.*;
@@ -55,6 +54,7 @@ import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
public class ServerConnectionDelegate extends MethodDelegate<ServerConnection> implements ProtocolDelegate<ServerConnection>
{
private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class);
+ private final AmqpPort<?> _port;
private List<Object> _locales;
private List<Object> _mechanisms;
@@ -82,13 +82,14 @@ public class ServerConnectionDelegate extends MethodDelegate<ServerConnection> i
private volatile SubjectAuthenticationResult _successfulAuthenticationResult;
- public ServerConnectionDelegate(Port<?> port, boolean secure, final String selectedHost)
+ public ServerConnectionDelegate(AmqpPort<?> port, boolean secure, final String selectedHost)
{
+ _port = port;
_broker = (Broker<?>) port.getParent();
_clientProperties = createConnectionProperties((Broker<?>) port.getParent());
_mechanisms = new ArrayList<>(port.getAuthenticationProvider().getAvailableMechanisms(secure));
- _maxNoOfChannels = _broker.getConnection_sessionCountLimit();
+ _maxNoOfChannels = port.getSessionCountLimit();
_subjectCreator = port.getSubjectCreator(secure, selectedHost);
_maximumFrameSize = Math.min(0xffff, _broker.getNetworkBufferSize());
}
@@ -368,6 +369,7 @@ public class ServerConnectionDelegate extends MethodDelegate<ServerConnection> i
if(ok.hasHeartbeat() && ok.getHeartbeat() > 0)
{
int heartbeat = ok.getHeartbeat();
+ sconn.setHeartBeatDelay(heartbeat);
long readerIdle = 2000L * heartbeat;
long writerIdle = 1000L * heartbeat;
sconn.getAmqpConnection().initialiseHeartbeating(writerIdle, readerIdle);
@@ -515,7 +517,7 @@ public class ServerConnectionDelegate extends MethodDelegate<ServerConnection> i
protected int getHeartbeatMax()
{
- int delay = (Integer)_broker.getAttribute(Broker.CONNECTION_HEART_BEAT_DELAY);
+ int delay = _port.getHeartbeatDelay();
return delay == 0 ? 0xFFFF : delay;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index ae529a4..735ccb5 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -495,7 +495,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
boolean explictlyRejected = routingResult.containsReject(RejectType.LIMIT_EXCEEDED);
if (!routingResult.hasRoutes() || explictlyRejected)
{
- boolean closeWhenNoRoute = serverSession.getAMQPConnection().getBroker().getConnection_closeWhenNoRoute();
+ boolean closeWhenNoRoute = serverSession.getAMQPConnection().getPort().getCloseWhenNoRoute();
boolean discardUnroutable = delvProps != null && delvProps.getDiscardUnroutable();
if (!discardUnroutable && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
index 393b4fc..a7f854b 100644
--- a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
@@ -103,13 +103,14 @@ public class ServerSessionTest extends QpidTestCase
when(modelConnection.closeAsync()).thenReturn(Futures.immediateFuture(null));
when(modelConnection.getAddressSpace()).thenReturn(_virtualHost);
when(modelConnection.getContextProvider()).thenReturn(_virtualHost);
- when(modelConnection.getBroker()).thenReturn((Broker)broker);
+ when(modelConnection.getBroker()).thenReturn(broker);
when(modelConnection.getEventLogger()).thenReturn(mock(EventLogger.class));
when(modelConnection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
when(modelConnection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
when(modelConnection.getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE)).thenReturn(Connection.DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE);
when(modelConnection.getChildExecutor()).thenReturn(_taskExecutor);
when(modelConnection.getModel()).thenReturn(BrokerModel.getInstance());
+ when(modelConnection.getPort()).thenReturn(port);
Subject subject = new Subject();
when(modelConnection.getSubject()).thenReturn(subject);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
index e1409f3..66d45d9 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import org.apache.qpid.server.model.DerivedAttribute;
import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock;
import org.apache.qpid.server.protocol.v0_8.transport.MethodRegistry;
import org.apache.qpid.server.protocol.ProtocolVersion;
@@ -50,6 +51,9 @@ public interface AMQPConnection_0_8<C extends AMQPConnection_0_8<C>> extends AMQ
@ManagedContextDefault(name= BATCH_LIMIT)
long DEFAULT_BATCH_LIMIT = 10L;
+ @DerivedAttribute(description = "The actual negotiated value of heartbeat delay.")
+ int getHeartbeatDelay();
+
MethodRegistry getMethodRegistry();
void writeFrame(AMQDataBlock frame);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index c46f069..82ab243 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -160,6 +160,7 @@ public class AMQPConnection_0_8Impl
private final Set<AMQPSession<?,?>> _sessionsWithWork =
Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?,?>, Boolean>());
+ private volatile int _heartBeatDelay;
public AMQPConnection_0_8Impl(Broker<?> broker,
ServerNetworkConnection network,
@@ -172,7 +173,7 @@ public class AMQPConnection_0_8Impl
super(broker, network, port, transport, protocol, connectionId, aggregateTicker);
- _maxNoOfChannels = broker.getConnection_sessionCountLimit();
+ _maxNoOfChannels = port.getSessionCountLimit();
_decoder = new BrokerDecoder(this);
_binaryDataLimit = getBroker().getContextKeys(false).contains(BROKER_DEBUG_BINARY_DATA_LENGTH)
? getBroker().getContextValue(Integer.class, BROKER_DEBUG_BINARY_DATA_LENGTH)
@@ -182,7 +183,7 @@ public class AMQPConnection_0_8Impl
_sendQueueDeleteOkRegardlessClientVerRegexp = Pattern.compile(sendQueueDeleteOkRegardlessRegexp);
_sender = network.getSender();
- _closeWhenNoRoute = getBroker().getConnection_closeWhenNoRoute();
+ _closeWhenNoRoute = port.getCloseWhenNoRoute();
}
@Override
@@ -429,16 +430,6 @@ public class AMQPConnection_0_8Impl
session.dispose();
}
- public int getMaximumNumberOfChannels()
- {
- return _maxNoOfChannels;
- }
-
- private void setMaximumNumberOfChannels(int value)
- {
- _maxNoOfChannels = value;
- }
-
@Override
public void closeChannel(AMQChannel channel)
@@ -746,7 +737,13 @@ public class AMQPConnection_0_8Impl
@Override
public int getSessionCountLimit()
{
- return getMaximumNumberOfChannels();
+ return _maxNoOfChannels;
+ }
+
+ @Override
+ public int getHeartbeatDelay()
+ {
+ return _heartBeatDelay;
}
public String getAddress()
@@ -905,11 +902,11 @@ public class AMQPConnection_0_8Impl
{
sendConnectionClose(ErrorCodes.CHANNEL_ERROR, "Channel " + channelId + " already exists", channelId);
}
- else if(channelId > getMaximumNumberOfChannels())
+ else if(channelId > getSessionCountLimit())
{
sendConnectionClose(ErrorCodes.CHANNEL_ERROR,
- "Channel " + channelId + " cannot be created as the max allowed channel id is "
- + getMaximumNumberOfChannels(),
+ "Channel " + channelId + " cannot be created as the max allowed channel id is "
+ + getSessionCountLimit(),
channelId);
}
else
@@ -1154,12 +1151,10 @@ public class AMQPConnection_0_8Impl
frameMax = Integer.MAX_VALUE;
}
- Broker<?> broker = getBroker();
-
ConnectionTuneBody tuneBody =
- methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
+ methodRegistry.createConnectionTuneBody(getPort().getSessionCountLimit(),
frameMax,
- broker.getConnection_heartBeatDelay());
+ getPort().getHeartbeatDelay());
writeFrame(tuneBody.generateFrame(0));
_state = ConnectionState.AWAIT_TUNE_OK;
disposeSaslNegotiator();
@@ -1195,6 +1190,7 @@ public class AMQPConnection_0_8Impl
if (heartbeat > 0)
{
+ _heartBeatDelay = heartbeat;
long writerDelay = 1000L * heartbeat;
long readerDelay = 1000L * getContextValue(Integer.class, AMQPConnection_0_8.PROPERTY_HEARTBEAT_TIMEOUT_FACTOR) * heartbeat;
initialiseHeartbeating(writerDelay, readerDelay);
@@ -1226,10 +1222,10 @@ public class AMQPConnection_0_8Impl
setMaxFrameSize(calculatedFrameMax);
//0 means no implied limit, except that forced by protocol limitations (0xFFFF)
- setMaximumNumberOfChannels( ((channelMax == 0) || (channelMax > 0xFFFF))
+ int value = ((channelMax == 0) || (channelMax > 0xFFFF))
? 0xFFFF
- : channelMax);
-
+ : channelMax;
+ _maxNoOfChannels = value;
}
_state = ConnectionState.AWAIT_OPEN;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java
index d035750..2631eaa 100644
--- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java
@@ -222,7 +222,7 @@ public class AMQPConnection_0_8Test extends QpidTestCase
int channelCount = conn.getSessionModels().size();
assertEquals("Initial channel count wrong", 0, channelCount);
- assertEquals("Number of channels not correctly set.", maxChannels, conn.getMaximumNumberOfChannels());
+ assertEquals("Number of channels not correctly set.", maxChannels, conn.getSessionCountLimit());
assertFalse("Connection should not be closed after opening " + maxChannels + " channels",
conn.isClosing());
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
index 9428e10..390b227 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.logging.EventLoggerProvider;
+import org.apache.qpid.server.model.DerivedAttribute;
import org.apache.qpid.server.model.ManagedContextDefault;
import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
@@ -51,6 +52,13 @@ public interface AMQPConnection_1_0<C extends AMQPConnection_1_0<C>> extends AMQ
Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
Symbol SHARED_SUBSCRIPTIONS = Symbol.valueOf("SHARED-SUBS");
+ @DerivedAttribute(description = "The idle timeout (in milliseconds) for incoming traffic.")
+ long getIncomingIdleTimeout();
+
+ @DerivedAttribute(description = "The period (in milliseconds) with which the Broker will generate heartbeat"
+ + " traffic if the wire would otherwise be idle.")
+ long getOutgoingIdleTimeout();
+
Object getReference();
String getRemoteContainerId();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 0d5d2af..1cbce38 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -182,10 +182,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
private SoleConnectionEnforcementPolicy _soleConnectionEnforcementPolicy;
private static final int CONNECTION_CONTROL_CHANNEL = 0;
-
- private static final int DEFAULT_CHANNEL_MAX = Math.min(Integer.getInteger("amqp.channel_max", 255), 0xFFFF);
-
- private AmqpPort<?> _port;
private SubjectCreator _subjectCreator;
private int _channelMax = 0;
@@ -201,7 +197,9 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
private Session_1_0[] _receivingSessions;
private volatile boolean _closedForOutput;
- private long _idleTimeout;
+ private final long _incomingIdleTimeout;
+
+ private volatile long _outgoingIdleTimeout;
private volatile ConnectionState _connectionState = ConnectionState.AWAIT_AMQP_OR_SASL_HEADER;
@@ -217,7 +215,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
private volatile SaslNegotiator _saslNegotiator;
private String _localHostname;
- private long _desiredIdleTimeout;
private static final long MINIMUM_SUPPORTED_IDLE_TIMEOUT = 1000L;
@@ -252,9 +249,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker);
_subjectCreator = port.getSubjectCreator(transport.isSecure(), network.getSelectedHost());
-
- _port = port;
-
_properties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), CommonProperties.getProductName());
_properties.put(Symbol.valueOf(ServerPropertyNames.VERSION), CommonProperties.getReleaseVersion());
_properties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), CommonProperties.getBuildVersion());
@@ -269,7 +263,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
setRemoteAddress(network.getRemoteAddress());
- setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay());
+ _incomingIdleTimeout = 1000L * port.getHeartbeatDelay();
_frameWriter = new FrameWriter(getDescribedTypeRegistry(), getSender());
}
@@ -412,12 +406,20 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
setSubject(_subjectCreator.createSubjectWithGroups(user));
}
- private long getDesiredIdleTimeout()
+ @Override
+ public long getIncomingIdleTimeout()
{
- return _desiredIdleTimeout;
+ return _incomingIdleTimeout;
}
@Override
+ public long getOutgoingIdleTimeout()
+ {
+ return _outgoingIdleTimeout;
+ }
+
+
+ @Override
public void receiveAttach(final int channel, final Attach attach)
{
assertState(ConnectionState.OPENED);
@@ -612,11 +614,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
return _remoteContainerId;
}
- private void setDesiredIdleTimeout(final long desiredIdleTimeout)
- {
- _desiredIdleTimeout = desiredIdleTimeout;
- }
-
public boolean isOpen()
{
return _connectionState == ConnectionState.OPENED;
@@ -794,10 +791,11 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
{
assertState(ConnectionState.AWAIT_OPEN);
- _channelMax = open.getChannelMax() == null ? DEFAULT_CHANNEL_MAX
- : open.getChannelMax().intValue() < DEFAULT_CHANNEL_MAX
+ int channelMax = getPort().getSessionCountLimit() - 1;
+ _channelMax = open.getChannelMax() == null ? channelMax
+ : open.getChannelMax().intValue() < channelMax
? open.getChannelMax().intValue()
- : DEFAULT_CHANNEL_MAX;
+ : channelMax;
if (_receivingSessions == null)
{
_receivingSessions = new Session_1_0[_channelMax + 1];
@@ -820,7 +818,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
}
if (open.getIdleTimeOut() != null)
{
- _idleTimeout = open.getIdleTimeOut().longValue();
+ _outgoingIdleTimeout = open.getIdleTimeOut().longValue();
}
_remoteProperties = open.getProperties() == null ? Collections.emptyMap() : Collections.unmodifiableMap(new LinkedHashMap<>(open.getProperties()));
_remoteDesiredCapabilities = open.getDesiredCapabilities() == null ? Collections.emptySet() : Sets.newHashSet(open.getDesiredCapabilities());
@@ -845,19 +843,18 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
}
}
- if (_idleTimeout != 0L && _idleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
+ if (_outgoingIdleTimeout != 0L && _outgoingIdleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
{
closeConnection(ConnectionError.CONNECTION_FORCED,
"Requested idle timeout of "
- + _idleTimeout
+ + _outgoingIdleTimeout
+ " is too low. The minimum supported timeout is"
+ MINIMUM_SUPPORTED_IDLE_TIMEOUT);
}
else
{
- long desiredIdleTimeout = getDesiredIdleTimeout();
- initialiseHeartbeating(_idleTimeout / 2L, desiredIdleTimeout);
- final NamedAddressSpace addressSpace = _port.getAddressSpace(_localHostname);
+ initialiseHeartbeating(_outgoingIdleTimeout / 2L, _incomingIdleTimeout);
+ final NamedAddressSpace addressSpace = getPort().getAddressSpace(_localHostname);
if (addressSpace == null)
{
closeConnection(AmqpError.NOT_FOUND, "Unknown hostname in connection open: '" + _localHostname + "'");
@@ -981,7 +978,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
private void populateConnectionRedirect(final NamedAddressSpace addressSpace, final Error err)
{
- final String redirectHost = addressSpace.getRedirectHost(_port);
+ final String redirectHost = addressSpace.getRedirectHost(getPort());
if(redirectHost == null)
{
@@ -1470,6 +1467,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
{
return _transportBlockedForWriting;
}
+
@Override
public void setTransportBlockedForWriting(final boolean blocked)
{
@@ -1677,7 +1675,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
open.setMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
// TODO - should we try to set the hostname based on the connection information?
// open.setHostname();
- open.setIdleTimeOut(UnsignedInteger.valueOf(_desiredIdleTimeout));
+ open.setIdleTimeOut(UnsignedInteger.valueOf(_incomingIdleTimeout));
// set the offered capabilities
if(_offeredCapabilities != null && !_offeredCapabilities.isEmpty())
@@ -1798,12 +1796,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
}
@Override
- public void initialiseHeartbeating(final long writerDelay, final long readerDelay)
- {
- super.initialiseHeartbeating(writerDelay, readerDelay);
- }
-
- @Override
public Iterator<IdentifiedTransaction> getOpenTransactions()
{
return new Iterator<IdentifiedTransaction>()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-plugins/management-http/src/main/java/resources/editBroker.html
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/editBroker.html b/broker-plugins/management-http/src/main/java/resources/editBroker.html
index 16cdd55..c5bab92 100644
--- a/broker-plugins/management-http/src/main/java/resources/editBroker.html
+++ b/broker-plugins/management-http/src/main/java/resources/editBroker.html
@@ -71,38 +71,7 @@
promptMessage: 'If encryption is enabled, configurations items such as passwords<br/>will be encrypted before being written to the configuration store.'" />
</div>
</div>
-
- <div class="clear formBox">
- <fieldset>
- <legend>Global Connection Defaults</legend>
- <div class="clear">
- <div class="formLabel-labelCell tableContainer-labelCell">Maximum number of sessions:</div>
- <div class="tableContainer-valueCell formLabel-controlCell">
- <input data-dojo-type="dijit/form/ValidationTextBox"
- id="editBroker.connection.sessionCountLimit"
- data-dojo-props="
- name: 'connection.sessionCountLimit',
- placeHolder: 'Number of sessions',
- trim: true,
- promptMessage: 'Maximum number of sessions per connection' "/>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell tableContainer-labelCell">Heart beat delay (s):</div>
- <div class="tableContainer-valueCell formLabel-controlCell">
- <input data-dojo-type="dijit/form/ValidationTextBox"
- id="editBroker.connection.heartBeatDelay"
- data-dojo-props="
- name: 'connection.heartBeatDelay',
- trim: true,
- invalidMessage: 'Invalid value',
- placeHolder: 'Time in seconds',
- promptMessage: 'Interval between heart beat messages exchanged between broker and clients'"/>
- </div>
- </div>
- </fieldset>
- </div>
-
+ <div class="clear"></div>
<div data-dojo-type="dijit/TitlePane" data-dojo-props="title: 'Context variables', open: false">
<div id="editBroker.context"
data-dojo-type="qpid/common/ContextVariablesEditor"
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js
index 2c60bed..96d1157 100644
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js
@@ -84,9 +84,7 @@ define(["dojo/parser",
"modelVersion",
"statisticsReportingPeriod",
"statisticsReportingResetEnabled",
- "confidentialConfigurationEncryptionProvider",
- "connection.sessionCountLimit",
- "connection.heartBeatDelay"];
+ "confidentialConfigurationEncryptionProvider"];
function Broker(kwArgs)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js
index 5af855c..2c6b0b2 100644
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js
@@ -135,6 +135,7 @@ define(["dojo/parser",
"protocol",
"remoteProcessPid",
"createdTime",
+ "sessionCountLimit",
"lastIoTime",
"msgInRate",
"bytesInRate",
@@ -263,6 +264,9 @@ define(["dojo/parser",
this.protocol.innerHTML = entities.encode(String(this.connectionData["protocol"]));
var remoteProcessPid = this.connectionData["remoteProcessPid"];
this.remoteProcessPid.innerHTML = entities.encode(String(remoteProcessPid ? remoteProcessPid : "N/A"));
+
+ this.sessionCountLimit.innerHTML = entities.encode(String(this.connectionData["sessionCountLimit"]));
+
var userPreferences = this.management.userPreferences;
this.createdTime.innerHTML = userPreferences.formatDateTime(this.connectionData["createdTime"], {
addOffset: true,
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editBroker.js
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editBroker.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editBroker.js
index 64f46d5..be8a67f 100644
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editBroker.js
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editBroker.js
@@ -59,9 +59,7 @@ define(["dojox/html/entities",
util,
template)
{
- var numericFieldNames = ["statisticsReportingPeriod",
- "connection.sessionCountLimit",
- "connection.heartBeatDelay"];
+ var numericFieldNames = ["statisticsReportingPeriod"];
var brokerEditor = {
init: function ()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-plugins/management-http/src/main/java/resources/showBroker.html
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/showBroker.html b/broker-plugins/management-http/src/main/java/resources/showBroker.html
index 00973f0..0d13b12 100644
--- a/broker-plugins/management-http/src/main/java/resources/showBroker.html
+++ b/broker-plugins/management-http/src/main/java/resources/showBroker.html
@@ -54,18 +54,6 @@
<div id="brokerAttribute.statisticsReportingResetEnabled"></div>
</div>
<div class="clear"></div>
- <br/>
- <div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Global Connection Defaults', open: true">
- <div id="brokerAttribute.connection.sessionCountLimit.container" class="clear">
- <div class="formLabel-labelCell">Maximum number of sessions:</div>
- <div id="brokerAttribute.connection.sessionCountLimit"></div>
- </div>
- <div id="brokerAttribute.connection.heartBeatDelay.container" class="clear">
- <div class="formLabel-labelCell">Heart beat delay (s):</div>
- <div id="brokerAttribute.connection.heartBeatDelay"></div>
- </div>
- <div class="clear"></div>
- </div>
</div>
<br/>
<button data-dojo-type="dijit.form.Button" class="editBroker">Edit</button>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/broker-plugins/management-http/src/main/java/resources/showConnection.html
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/showConnection.html b/broker-plugins/management-http/src/main/java/resources/showConnection.html
index 6793e77..8ca1324 100644
--- a/broker-plugins/management-http/src/main/java/resources/showConnection.html
+++ b/broker-plugins/management-http/src/main/java/resources/showConnection.html
@@ -87,6 +87,10 @@
<div class="formLabel-labelCell">Protocol:</div>
<div class="protocol"></div>
</div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Session Count Limit:</div>
+ <div class="sessionCountLimit"></div>
+ </div>
</div>
<div class="dijitDialogPaneActionBar">
<button data-dojo-type="dijit.form.Button" class="closeButton" data-dojo-props="iconClass: 'dijitIconDelete'">Close</button>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java b/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
index 124ad1f..ca221df 100644
--- a/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
+++ b/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
@@ -22,6 +22,8 @@ import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTE
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,8 +40,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.test.utils.TCPTunneler;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
public class HeartbeatTest extends QpidBrokerTestCase
{
@@ -54,7 +59,10 @@ public class HeartbeatTest extends QpidBrokerTestCase
{
if (getName().equals("testHeartbeatsEnabledBrokerSide"))
{
- getDefaultBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_HEART_BEAT_DELAY, "1");
+ getDefaultBrokerConfiguration().setObjectAttribute(Port.class,
+ TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT,
+ AmqpPort.CONTEXT,
+ Collections.singletonMap(AmqpPort.HEART_BEAT_DELAY, "1"));
}
super.setUp();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/systests/src/test/java/org/apache/qpid/systest/rest/BrokerRestTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/rest/BrokerRestTest.java b/systests/src/test/java/org/apache/qpid/systest/rest/BrokerRestTest.java
index feaf728..afa8376 100644
--- a/systests/src/test/java/org/apache/qpid/systest/rest/BrokerRestTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/rest/BrokerRestTest.java
@@ -152,8 +152,6 @@ public class BrokerRestTest extends QpidRestTestCase
public void testPutToUpdateWithInvalidAttributeValues() throws Exception
{
Map<String, Object> invalidAttributes = new HashMap<String, Object>();
- invalidAttributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, -10);
- invalidAttributes.put(Broker.CONNECTION_HEART_BEAT_DELAY, -11000);
invalidAttributes.put(Broker.STATISTICS_REPORTING_PERIOD, -12000);
for (Map.Entry<String, Object> entry : invalidAttributes.entrySet())
@@ -242,8 +240,6 @@ public class BrokerRestTest extends QpidRestTestCase
private Map<String, Object> getValidBrokerAttributes()
{
Map<String, Object> brokerAttributes = new HashMap<String, Object>();
- brokerAttributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, 10);
- brokerAttributes.put(Broker.CONNECTION_HEART_BEAT_DELAY, 11000);
brokerAttributes.put(Broker.STATISTICS_REPORTING_PERIOD, 12000);
brokerAttributes.put(Broker.STATISTICS_REPORTING_RESET_ENABLED, true);
return brokerAttributes;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/systests/src/test/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java b/systests/src/test/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
index 9a0362e..43e95ae 100644
--- a/systests/src/test/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.systest.rest.acl;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -535,45 +536,38 @@ public class BrokerACLTest extends QpidRestTestCase
{
getRestTestHelper().setUsernameAndPassword(ALLOWED_USER, ALLOWED_USER);
- int initialSessionCountLimit = 256;
- int updatedSessionCountLimit = 299;
-
Map<String, Object> brokerAttributes = getRestTestHelper().getJsonAsSingletonList("broker");
- assertEquals("Unexpected alert repeat gap", initialSessionCountLimit,
- brokerAttributes.get(Broker.CONNECTION_SESSION_COUNT_LIMIT));
+ assertEquals("Unexpected description", null,
+ brokerAttributes.get(Broker.DESCRIPTION));
- Map<String, Object> newAttributes = new HashMap<String, Object>();
- newAttributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, updatedSessionCountLimit);
+ String descriptionValue = "test description";
- int responseCode = getRestTestHelper().submitRequest("broker", "PUT", newAttributes);
- assertEquals("Setting of port attribites should be allowed", 200, responseCode);
+ getRestTestHelper().submitRequest("broker",
+ "PUT",
+ Collections.singletonMap(Broker.DESCRIPTION, descriptionValue),
+ 200);
brokerAttributes = getRestTestHelper().getJsonAsSingletonList("broker");
- assertEquals("Unexpected default alert repeat gap", updatedSessionCountLimit,
- brokerAttributes.get(Broker.CONNECTION_SESSION_COUNT_LIMIT));
+ assertEquals("Unexpected description", descriptionValue,
+ brokerAttributes.get(Broker.DESCRIPTION));
}
public void testSetBrokerAttributesDenied() throws Exception
{
getRestTestHelper().setUsernameAndPassword(ALLOWED_USER, ALLOWED_USER);
- int initialSessionCountLimit = 256;
- int updatedSessionCountLimit = 299;
-
Map<String, Object> brokerAttributes = getRestTestHelper().getJsonAsSingletonList("broker");
- assertEquals("Unexpected alert repeat gap", initialSessionCountLimit,
- brokerAttributes.get(Broker.CONNECTION_SESSION_COUNT_LIMIT));
+ assertEquals("Unexpected description", null, brokerAttributes.get(Broker.DESCRIPTION));
getRestTestHelper().setUsernameAndPassword(DENIED_USER, DENIED_USER);
- Map<String, Object> newAttributes = new HashMap<String, Object>();
- newAttributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, updatedSessionCountLimit);
- int responseCode = getRestTestHelper().submitRequest("broker", "PUT", newAttributes);
- assertEquals("Setting of port attribites should be allowed", 403, responseCode);
+ getRestTestHelper().submitRequest("broker",
+ "PUT",
+ Collections.singletonMap(Broker.DESCRIPTION, "test description"),
+ 403);
brokerAttributes = getRestTestHelper().getJsonAsSingletonList("broker");
- assertEquals("Unexpected default alert repeat gap", initialSessionCountLimit,
- brokerAttributes.get(Broker.CONNECTION_SESSION_COUNT_LIMIT));
+ assertEquals("Unexpected description", null, brokerAttributes.get(Broker.DESCRIPTION));
}
/* === GroupProvider === */
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/systests/src/test/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java b/systests/src/test/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java
index 1a9cbf6..88ecf65 100644
--- a/systests/src/test/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java
+++ b/systests/src/test/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.qpid.test.client;
+import java.util.Collections;
+
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -28,8 +30,10 @@ import javax.jms.Session;
import javax.jms.Topic;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
/**
* @see CloseOnNoRouteForMandatoryMessageTest for related tests
@@ -37,12 +41,17 @@ import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase
{
private Connection _connection;
- private UnroutableMessageTestExceptionListener _testExceptionListener = new UnroutableMessageTestExceptionListener();
+ private UnroutableMessageTestExceptionListener _testExceptionListener =
+ new UnroutableMessageTestExceptionListener();
@Override
public void setUp() throws Exception
{
- getDefaultBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false);
+ getDefaultBrokerConfiguration().setObjectAttribute(Port.class,
+ TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT,
+ AmqpPort.CONTEXT,
+ Collections.singletonMap(AmqpPort.CLOSE_WHEN_NO_ROUTE, "false"));
+
super.setUp();
_connection = getConnection();
_connection.setExceptionListener(_testExceptionListener);
@@ -213,7 +222,7 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase
public void testMandatoryAndImmediateSystemProperties() throws Exception
{
- setTestClientSystemProperty("qpid.default_mandatory","true");
+ setTestClientSystemProperty("qpid.default_mandatory", "true");
Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// publish to non-existent topic - should get mandatory failure
@@ -225,7 +234,7 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase
_testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
// now set topic specific system property to false - should no longer get mandatory failure on new producer
- setTestClientSystemProperty("qpid.default_mandatory_topic","false");
+ setTestClientSystemProperty("qpid.default_mandatory_topic", "false");
producer = session.createProducer(null);
message = session.createMessage();
producer.send(session.createTopic(getTestQueueName()), message);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d574dc5/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
index eb6b403..1a61df8 100644
--- a/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
+++ b/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
@@ -20,14 +20,19 @@
*/
package org.apache.qpid.test.unit.transacted;
+import java.util.HashMap;
+import java.util.Map;
+
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Topic;
-import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
/**
* This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration
@@ -43,8 +48,15 @@ public class TransactionTimeoutTest extends TransactionTimeoutTestCase
protected void configure() throws Exception
{
// switch off connection close in order to test timeout on publishing of unroutable messages
- getDefaultBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false);
- setTestSystemProperty(Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD, "100");
+ Map<String, String> context = new HashMap<>();
+ context.put(AmqpPort.CLOSE_WHEN_NO_ROUTE, "false");
+ context.put(Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD, "100");
+
+ getDefaultBrokerConfiguration().setObjectAttribute(Port.class,
+ TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT,
+ AmqpPort.CONTEXT,
+ context);
+
if (getName().contains("ProducerIdle"))
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org