You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/17 21:19:38 UTC
svn commit: r1569102 [1/5] - in
/qpid/branches/java-broker-amqp-1-0-management/java:
amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/
bdbstore/src/test/java/org/ap...
Author: rgodfrey
Date: Mon Feb 17 20:19:36 2014
New Revision: 1569102
URL: http://svn.apache.org/r1569102
Log:
Update Queue implementation to better define lifetime and exclusivity policies
Added:
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/ExclusivityPolicy.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/Deletable.java
Modified:
qpid/branches/java-broker-amqp-1-0-management/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
qpid/branches/java-broker-amqp-1-0-management/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
qpid/branches/java-broker-amqp-1-0-management/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
qpid/branches/java-broker-amqp-1-0-management/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java
qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java
qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java
qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
qpid/branches/java-broker-amqp-1-0-management/java/test-profiles/Java010Excludes
qpid/branches/java-broker-amqp-1-0-management/java/test-profiles/JavaPre010Excludes
Modified: qpid/branches/java-broker-amqp-1-0-management/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java Mon Feb 17 20:19:36 2014
@@ -563,7 +563,7 @@ public class ConnectionEndpoint implemen
{
_receivingSessions[channel] = null;
- endpoint.end(end);
+ endpoint.receiveEnd(end);
}
else
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java Mon Feb 17 20:19:36 2014
@@ -153,19 +153,47 @@ public class SessionEndpoint
public void end()
{
- end(null);
+ end(new End());
}
- public void end(final End end)
+ public void end(End end)
{
synchronized(getLock())
{
switch(_state)
{
case BEGIN_SENT:
- _connection.sendEnd(getSendingChannel(), new End(), false);
+ _connection.sendEnd(getSendingChannel(), end, false);
_state = SessionState.END_PIPE;
break;
+ case ACTIVE:
+ detachLinks();
+ short sendChannel = getSendingChannel();
+ _connection.sendEnd(sendChannel, end, true);
+ _state = SessionState.END_SENT;
+ break;
+ default:
+ sendChannel = getSendingChannel();
+ End reply = new End();
+ Error error = new Error();
+ error.setCondition(AmqpError.ILLEGAL_STATE);
+ error.setDescription("END called on Session which has not been opened");
+ reply.setError(error);
+ _connection.sendEnd(sendChannel, reply, true);
+ break;
+
+
+ }
+ getLock().notifyAll();
+ }
+ }
+
+ public void receiveEnd(final End end)
+ {
+ synchronized(getLock())
+ {
+ switch(_state)
+ {
case END_SENT:
_state = SessionState.ENDED;
break;
@@ -174,7 +202,7 @@ public class SessionEndpoint
_sessionEventListener.remoteEnd(end);
short sendChannel = getSendingChannel();
_connection.sendEnd(sendChannel, new End(), true);
- _state = end == null ? SessionState.END_SENT : SessionState.ENDED;
+ _state = SessionState.ENDED;
break;
default:
sendChannel = getSendingChannel();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java Mon Feb 17 20:19:36 2014
@@ -568,7 +568,7 @@ public class UpgradeFrom5To6 extends Abs
Map<String, Object> attributesMap = new HashMap<String, Object>();
attributesMap.put(Exchange.NAME, exchangeName);
attributesMap.put(Exchange.TYPE, exchangeType);
- attributesMap.put(Exchange.LIFETIME_POLICY, autoDelete ? LifetimePolicy.AUTO_DELETE.name()
+ attributesMap.put(Exchange.LIFETIME_POLICY, autoDelete ? "AUTO_DELETE"
: LifetimePolicy.PERMANENT.name());
String json = _serializer.serialize(attributesMap);
UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Exchange.class.getName(), json);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java Mon Feb 17 20:19:36 2014
@@ -44,6 +44,7 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java Mon Feb 17 20:19:36 2014
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.configuration;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.Map;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
@@ -43,7 +45,6 @@ public class QueueConfiguration extends
CompositeConfiguration mungedConf = new CompositeConfiguration();
mungedConf.addConfiguration(_vHostConfig.getConfig().subset("queues.queue." + escapeTagName(name)));
- mungedConf.addConfiguration(_vHostConfig.getConfig().subset("queues"));
setConfiguration("virtualhosts.virtualhost.queues.queue", mungedConf);
}
@@ -85,19 +86,33 @@ public class QueueConfiguration extends
return _vHostConfig;
}
+ private boolean getDefaultedBoolean(String attribute)
+ {
+ final Configuration config = _vHostConfig.getConfig();
+ if(config.containsKey("queues."+attribute))
+ {
+ final boolean defaultValue = config.getBoolean("queues." + attribute);
+ return getBooleanValue(attribute, defaultValue);
+ }
+ else
+ {
+ return getBooleanValue(attribute);
+ }
+ }
+
public boolean getDurable()
{
- return getBooleanValue("durable");
+ return getDefaultedBoolean("boolean");
}
public boolean getExclusive()
{
- return getBooleanValue("exclusive");
+ return getDefaultedBoolean("exclusive");
}
public boolean getAutoDelete()
{
- return getBooleanValue("autodelete");
+ return getDefaultedBoolean("autodelete");
}
public String getOwner()
@@ -107,17 +122,41 @@ public class QueueConfiguration extends
public boolean getPriority()
{
- return getBooleanValue("priority");
+ return getDefaultedBoolean("priority");
}
public int getPriorities()
{
- return getIntValue("priorities", -1);
+ final Configuration config = _vHostConfig.getConfig();
+
+ int defaultValue;
+ if(config.containsKey("queues.priorities"))
+ {
+ defaultValue = config.getInt("queues.priorities");
+ }
+ else
+ {
+ defaultValue = -1;
+ }
+ return getIntValue("priorities", defaultValue);
}
public String getExchange()
{
- return getStringValue("exchange", ExchangeDefaults.DEFAULT_EXCHANGE_NAME);
+ final Configuration config = _vHostConfig.getConfig();
+
+ String defaultValue;
+
+ if(config.containsKey("queues.exchange"))
+ {
+ defaultValue = config.getString("queues.exchange");
+ }
+ else
+ {
+ defaultValue = "";
+ }
+
+ return getStringValue("exchange", defaultValue);
}
public List getRoutingKeys()
@@ -137,37 +176,37 @@ public class QueueConfiguration extends
public int getMaximumMessageAge()
{
- return getIntValue("maximumMessageAge", _vHostConfig.getMaximumMessageAge());
+ return getIntValue("maximumMessageAge");
}
public long getMaximumQueueDepth()
{
- return getLongValue("maximumQueueDepth", _vHostConfig.getMaximumQueueDepth());
+ return getLongValue("maximumQueueDepth");
}
public long getMaximumMessageSize()
{
- return getLongValue("maximumMessageSize", _vHostConfig.getMaximumMessageSize());
+ return getLongValue("maximumMessageSize");
}
public long getMaximumMessageCount()
{
- return getLongValue("maximumMessageCount", _vHostConfig.getMaximumMessageCount());
+ return getLongValue("maximumMessageCount");
}
public long getMinimumAlertRepeatGap()
{
- return getLongValue("minimumAlertRepeatGap", _vHostConfig.getMinimumAlertRepeatGap());
+ return getLongValue("minimumAlertRepeatGap");
}
public long getCapacity()
{
- return getLongValue("capacity", _vHostConfig.getCapacity());
+ return getLongValue("capacity");
}
public long getFlowResumeCapacity()
{
- return getLongValue("flowResumeCapacity", _vHostConfig.getFlowResumeCapacity());
+ return getLongValue("flowResumeCapacity");
}
public boolean isLVQ()
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java Mon Feb 17 20:19:36 2014
@@ -22,6 +22,7 @@
package org.apache.qpid.server.filter;
import java.lang.ref.WeakReference;
+import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.WeakHashMap;
@@ -29,6 +30,7 @@ import org.apache.qpid.common.AMQPFilter
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -120,19 +122,25 @@ public class FilterSupport
public static final class NoLocalFilter implements MessageFilter
{
- private final MessageSource _queue;
+ private final MessageSource<?,?> _queue;
- public NoLocalFilter(MessageSource queue)
+ private NoLocalFilter(MessageSource queue)
{
_queue = queue;
}
public boolean matches(Filterable message)
{
- final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession();
- return exclusiveOwningSession == null ||
- exclusiveOwningSession.getConnectionReference() != message.getConnectionReference();
+ final Collection<? extends Consumer> consumers = _queue.getConsumers();
+ for(Consumer c : consumers)
+ {
+ if(c.getSessionModel().getConnectionReference() == message.getConnectionReference())
+ {
+ return false;
+ }
+ }
+ return !consumers.isEmpty();
}
@Override
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java Mon Feb 17 20:19:36 2014
@@ -46,7 +46,7 @@ public class ChannelLogSubject extends A
AMQConnectionModel connection = session.getConnectionModel();
setLogStringWithFormat(CHANNEL_FORMAT,
connection == null ? -1L : connection.getConnectionId(),
- (connection == null || connection.getPrincipalAsString() == null) ? "?" : connection.getPrincipalAsString(),
+ (connection == null || connection.getAuthorizedPrincipal() == null) ? "?" : connection.getAuthorizedPrincipal().getName(),
(connection == null || connection.getRemoteAddressString() == null) ? "?" : connection.getRemoteAddressString(),
(connection == null || connection.getVirtualHostName() == null) ? "?" : connection.getVirtualHostName(),
session.getChannelId());
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java Mon Feb 17 20:19:36 2014
@@ -55,7 +55,7 @@ public class ConnectionLogSubject extend
{
if (!_upToDate)
{
- if (_session.getPrincipalAsString() != null)
+ if (_session.getAuthorizedPrincipal() != null)
{
if (_session.getVirtualHostName() != null)
{
@@ -71,7 +71,7 @@ public class ConnectionLogSubject extend
*/
setLogString("[" + MessageFormat.format(CONNECTION_FORMAT,
_session.getConnectionId(),
- _session.getPrincipalAsString(),
+ _session.getAuthorizedPrincipal().getName(),
_session.getRemoteAddressString(),
_session.getVirtualHostName())
+ "] ");
@@ -82,7 +82,7 @@ public class ConnectionLogSubject extend
{
setLogString("[" + MessageFormat.format(USER_FORMAT,
_session.getConnectionId(),
- _session.getPrincipalAsString(),
+ _session.getAuthorizedPrincipal().getName(),
_session.getRemoteAddressString())
+ "] ");
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java Mon Feb 17 20:19:36 2014
@@ -36,7 +36,8 @@ public interface MessageSource<C extends
<T extends ConsumerTarget> C addConsumer(T target, FilterManager filters,
Class<? extends ServerMessage> messageClass,
String consumerName, EnumSet<Consumer.Option> options)
- throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException;
+ throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException,
+ ConsumerAccessRefused;
Collection<C> getConsumers();
@@ -44,16 +45,10 @@ public interface MessageSource<C extends
void removeConsumerRegistrationListener(ConsumerRegistrationListener<S> listener);
- AuthorizationHolder getAuthorizationHolder();
-
- void setAuthorizationHolder(AuthorizationHolder principalHolder);
-
- void setExclusiveOwningSession(AMQSessionModel owner);
-
- AMQSessionModel getExclusiveOwningSession();
-
boolean isExclusive();
+ boolean verifySessionAccess(AMQSessionModel<?,?> session);
+
interface ConsumerRegistrationListener<Q extends MessageSource<? extends Consumer,?>>
{
void consumerAdded(Q source, Consumer consumer);
@@ -76,7 +71,6 @@ public interface MessageSource<C extends
public ExistingExclusiveConsumer()
{
- super("");
}
}
@@ -95,7 +89,15 @@ public interface MessageSource<C extends
{
public ExistingConsumerPreventsExclusive()
{
- super("");
}
}
+
+ static final class ConsumerAccessRefused extends Exception
+ {
+ public ConsumerAccessRefused()
+ {
+ }
+ }
+
+
}
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/ExclusivityPolicy.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/ExclusivityPolicy.java?rev=1569102&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/ExclusivityPolicy.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/ExclusivityPolicy.java Mon Feb 17 20:19:36 2014
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+public enum ExclusivityPolicy
+{
+ NONE,
+ SESSION,
+ CONNECTION,
+ CONTAINER,
+ PRINCIPAL,
+ LINK
+}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java Mon Feb 17 20:19:36 2014
@@ -23,5 +23,9 @@ package org.apache.qpid.server.model;
public enum LifetimePolicy
{
PERMANENT,
- AUTO_DELETE
+ DELETE_ON_CONNECTION_CLOSE,
+ DELETE_ON_SESSION_END,
+ DELETE_ON_NO_OUTBOUND_LINKS,
+ DELETE_ON_NO_LINKS,
+ IN_USE
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Mon Feb 17 20:19:36 2014
@@ -160,7 +160,7 @@ public interface VirtualHost extends Con
QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
CONFIG_PATH));
- int CURRENT_CONFIG_VERSION = 3;
+ int CURRENT_CONFIG_VERSION = 4;
//children
Collection<VirtualHostAlias> getAliases();
@@ -172,9 +172,8 @@ public interface VirtualHost extends Con
LifetimePolicy lifetime, long ttl, String type, Map<String, Object> attributes)
throws AccessControlException, IllegalArgumentException;
- Queue createQueue(String name, State initialState, boolean durable,
- boolean exclusive, LifetimePolicy lifetime, long ttl, Map<String, Object> attributes)
- throws AccessControlException, IllegalArgumentException;
+ Queue createQueue(Map<String, Object> attributes)
+ throws AccessControlException, IllegalArgumentException;
Collection<String> getExchangeTypes();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java Mon Feb 17 20:19:36 2014
@@ -170,7 +170,7 @@ final class BindingAdapter extends Abstr
}
else if(LIFETIME_POLICY.equals(name))
{
- return _queue.getLifetimePolicy() == LifetimePolicy.AUTO_DELETE || _exchange.getLifetimePolicy() == LifetimePolicy.AUTO_DELETE ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT;
+ return _queue.getLifetimePolicy() != LifetimePolicy.PERMANENT || _exchange.getLifetimePolicy() != LifetimePolicy.PERMANENT ? LifetimePolicy.IN_USE : LifetimePolicy.PERMANENT;
}
else if(TIME_TO_LIVE.equals(name))
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java Mon Feb 17 20:19:36 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.server.model.adapter;
import java.security.AccessControlException;
+import java.security.Principal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -191,7 +192,8 @@ final class ConnectionAdapter extends Ab
}
else if(name.equals(PRINCIPAL))
{
- return _connection.getPrincipalAsString();
+ final Principal authorizedPrincipal = _connection.getAuthorizedPrincipal();
+ return authorizedPrincipal == null ? null : authorizedPrincipal.getName();
}
else if(name.equals(PROPERTIES))
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java Mon Feb 17 20:19:36 2014
@@ -130,7 +130,7 @@ public class ConsumerAdapter extends Abs
}
else if(LIFETIME_POLICY.equals(name))
{
- return LifetimePolicy.AUTO_DELETE;
+ return LifetimePolicy.DELETE_ON_SESSION_END;
}
else if(TIME_TO_LIVE.equals(name))
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java Mon Feb 17 20:19:36 2014
@@ -201,7 +201,7 @@ final class ExchangeAdapter extends Abst
public LifetimePolicy getLifetimePolicy()
{
- return _exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT;
+ return _exchange.isAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
}
public LifetimePolicy setLifetimePolicy(final LifetimePolicy expected, final LifetimePolicy desired)
@@ -330,7 +330,7 @@ final class ExchangeAdapter extends Abst
}
else if(LIFETIME_POLICY.equals(name))
{
- return _exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT;
+ return _exchange.isAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
}
else if(TIME_TO_LIVE.equals(name))
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java Mon Feb 17 20:19:36 2014
@@ -31,15 +31,7 @@ import java.util.Map;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.ConfiguredObjectFinder;
-import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.IllegalStateTransitionException;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.QueueNotificationListener;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.model.*;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.*;
@@ -62,7 +54,6 @@ final class QueueAdapter<Q extends AMQQu
put(QUEUE_FLOW_CONTROL_SIZE_BYTES, Long.class);
put(QUEUE_FLOW_RESUME_SIZE_BYTES, Long.class);
put(MAXIMUM_DELIVERY_ATTEMPTS, Integer.class);
- put(EXCLUSIVE, Boolean.class);
put(DESCRIPTION, String.class);
}});
@@ -208,7 +199,7 @@ final class QueueAdapter<Q extends AMQQu
public LifetimePolicy getLifetimePolicy()
{
- return _queue.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT;
+ return _queue.getLifetimePolicy();
}
public LifetimePolicy setLifetimePolicy(final LifetimePolicy expected, final LifetimePolicy desired)
@@ -274,9 +265,33 @@ final class QueueAdapter<Q extends AMQQu
}
else if(EXCLUSIVE.equals(name))
{
- Boolean exclusiveFlag = (Boolean) desired;
- _queue.setExclusive(exclusiveFlag);
+ ExclusivityPolicy desiredPolicy;
+ if(desired == null)
+ {
+ desiredPolicy = ExclusivityPolicy.NONE;
+ }
+ else if(desired instanceof ExclusivityPolicy)
+ {
+ desiredPolicy = (ExclusivityPolicy)desired;
+ }
+ else if (desired instanceof String)
+ {
+ desiredPolicy = ExclusivityPolicy.valueOf((String)desired);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Cannot set " + Queue.EXCLUSIVE + " property to type " + desired.getClass().getName());
+ }
+ try
+ {
+ _queue.setExclusivityPolicy(desiredPolicy);
+ }
+ catch (MessageSource.ExistingConsumerPreventsExclusive existingConsumerPreventsExclusive)
+ {
+ throw new IllegalArgumentException("Unable to set exclusivity policy to " + desired + " as an existing combinations of consumers prevents this");
+ }
return true;
+
}
else if(MESSAGE_GROUP_KEY.equals(name))
{
@@ -376,7 +391,7 @@ final class QueueAdapter<Q extends AMQQu
}
else if(EXCLUSIVE.equals(name))
{
- return _queue.isExclusive();
+ return _queue.getAttribute(Queue.EXCLUSIVE);
}
else if(MESSAGE_GROUP_KEY.equals(name))
{
@@ -458,7 +473,7 @@ final class QueueAdapter<Q extends AMQQu
}
else if(LIFETIME_POLICY.equals(name))
{
- return _queue.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT;
+ return _queue.getLifetimePolicy();
}
else if(NAME.equals(name))
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Mon Feb 17 20:19:36 2014
@@ -23,7 +23,6 @@ package org.apache.qpid.server.model.ada
import java.io.File;
import java.lang.reflect.Type;
import java.security.AccessControlException;
-import java.security.Principal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -58,18 +57,16 @@ import org.apache.qpid.server.model.Queu
import org.apache.qpid.server.model.QueueType;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostAlias;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.ConflationQueue;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.LocalTransaction;
@@ -356,7 +353,7 @@ public final class VirtualHostAdapter ex
name,
type,
durable,
- lifetime == LifetimePolicy.AUTO_DELETE,
+ lifetime != null && lifetime != LifetimePolicy.PERMANENT,
alternateExchange);
synchronized (_exchangeAdapters)
{
@@ -389,7 +386,7 @@ public final class VirtualHostAdapter ex
public Queue createQueue(Map<String, Object> attributes)
throws AccessControlException, IllegalArgumentException
{
- attributes = new HashMap<String, Object>(attributes);
+ checkVHostStateIsActive();
if (attributes.containsKey(Queue.QUEUE_TYPE))
{
@@ -405,7 +402,7 @@ public final class VirtualHostAdapter ex
}
if (queueType == QueueType.LVQ && attributes.get(Queue.LVQ_KEY) == null)
{
- attributes.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_DEFAULT_LVQ_KEY);
+ attributes.put(Queue.LVQ_KEY, ConflationQueue.DEFAULT_LVQ_KEY);
}
else if (queueType == QueueType.PRIORITY && attributes.get(Queue.PRIORITIES) == null)
{
@@ -417,51 +414,12 @@ public final class VirtualHostAdapter ex
}
}
- String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes, null);
- State state = MapValueConverter.getEnumAttribute(State.class, Queue.STATE, attributes, State.ACTIVE);
- boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false);
- LifetimePolicy lifetime = MapValueConverter.getEnumAttribute(LifetimePolicy.class, Queue.LIFETIME_POLICY, attributes, LifetimePolicy.PERMANENT);
- long ttl = MapValueConverter.getLongAttribute(Queue.TIME_TO_LIVE, attributes, 0l);
- boolean exclusive= MapValueConverter.getBooleanAttribute(Queue.EXCLUSIVE, attributes, false);
-
- attributes.remove(Queue.NAME);
- attributes.remove(Queue.STATE);
- attributes.remove(Queue.DURABLE);
- attributes.remove(Queue.LIFETIME_POLICY);
- attributes.remove(Queue.TIME_TO_LIVE);
-
- return createQueue(name, state, durable, exclusive, lifetime, ttl, attributes);
- }
-
- public Queue createQueue(final String name,
- final State initialState,
- final boolean durable,
- boolean exclusive,
- final LifetimePolicy lifetime,
- final long ttl,
- final Map<String, Object> attributes)
- throws AccessControlException, IllegalArgumentException
- {
- checkVHostStateIsActive();
-
- String owner = null;
- if(exclusive)
- {
- Principal authenticatedPrincipal = AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(SecurityManager.getThreadSubject());
- if(authenticatedPrincipal != null)
- {
- owner = authenticatedPrincipal.getName();
- }
- }
- final boolean autoDelete = lifetime == LifetimePolicy.AUTO_DELETE;
try
{
- AMQQueue queue =
- _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(name, _virtualHost.getName()), name,
- durable, owner, autoDelete, exclusive, autoDelete && exclusive, attributes);
+ AMQQueue queue = _virtualHost.createQueue(null, attributes);
synchronized (_queueAdapters)
{
@@ -471,15 +429,15 @@ public final class VirtualHostAdapter ex
}
catch(QueueExistsException qe)
{
- throw new IllegalArgumentException("Queue with name "+name+" already exists");
+ throw new IllegalArgumentException("Queue with name "+MapValueConverter.getStringAttribute(Queue.NAME,attributes)+" already exists");
}
catch (QpidSecurityException e)
{
throw new AccessControlException(e.toString());
}
-
}
+
public String getName()
{
return (String)getAttribute(NAME);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Mon Feb 17 20:19:36 2014
@@ -25,11 +25,13 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.util.Deletable;
+import java.security.Principal;
import java.util.List;
import java.util.UUID;
-public interface AMQConnectionModel extends StatisticsGatherer
+public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends AMQSessionModel<S,T>> extends StatisticsGatherer, Deletable<T>
{
/**
* Close the underlying Connection
@@ -50,7 +52,7 @@ public interface AMQConnectionModel exte
* @param cause
* @param message
*/
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message);
+ public void closeSession(S session, AMQConstant cause, String message);
public long getConnectionId();
@@ -59,15 +61,13 @@ public interface AMQConnectionModel exte
*
* @return a list of {@link AMQSessionModel}s
*/
- public List<AMQSessionModel> getSessionModels();
+ public List<S> getSessionModels();
/**
* Return a {@link LogSubject} for the connection.
*/
public LogSubject getLogSubject();
- public String getUserName();
-
public boolean isSessionNameUnique(byte[] name);
String getRemoteAddressString();
@@ -78,7 +78,7 @@ public interface AMQConnectionModel exte
String getClientProduct();
- String getPrincipalAsString();
+ Principal getAuthorizedPrincipal();
long getSessionCountLimit();
@@ -93,4 +93,6 @@ public interface AMQConnectionModel exte
boolean isStopped();
String getVirtualHostName();
+
+ String getRemoteContainerName();
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Mon Feb 17 20:19:36 2014
@@ -26,17 +26,18 @@ import java.util.concurrent.ConcurrentSk
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.util.Deletable;
/**
* Session model interface.
* Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet}
* when monitoring the blocking and blocking of queues/sessions in {@link AMQQueue}.
*/
-public interface AMQSessionModel extends Comparable<AMQSessionModel>
+public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQConnectionModel<C,T>> extends Comparable<T>, Deletable<T>
{
public UUID getId();
- public AMQConnectionModel getConnectionModel();
+ public C getConnectionModel();
public String getClientID();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Feb 17 20:19:36 2014
@@ -27,10 +27,13 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -38,9 +41,12 @@ import java.util.List;
import java.util.Set;
public interface AMQQueue<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, C extends Consumer>
- extends Comparable<Q>, ExchangeReferrer, BaseQueue<C>, MessageSource<C,Q>, CapacityChecker, MessageDestination
+ extends Comparable<Q>, ExchangeReferrer, BaseQueue<C>, MessageSource<C,Q>, CapacityChecker, MessageDestination,
+ Deletable<Q>
{
+ void setExclusivityPolicy(ExclusivityPolicy desiredPolicy) throws ExistingConsumerPreventsExclusive;
+
public interface NotificationListener
{
void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
@@ -66,9 +72,7 @@ public interface AMQQueue<E extends Queu
long getTotalEnqueueCount();
- void setNoLocal(boolean b);
-
- boolean isAutoDelete();
+ LifetimePolicy getLifetimePolicy();
String getOwner();
@@ -104,11 +108,6 @@ public interface AMQQueue<E extends Queu
boolean resend(final E entry, final C consumer);
- void addQueueDeleteTask(Action<AMQQueue> task);
- void removeQueueDeleteTask(Action<AMQQueue> task);
-
-
-
List<E> getMessagesOnTheQueue();
List<Long> getMessagesOnTheQueue(int num);
@@ -189,10 +188,6 @@ public interface AMQQueue<E extends Queu
Collection<String> getAvailableAttributes();
Object getAttribute(String attrName);
- void configure(QueueConfiguration config);
-
- void setExclusive(boolean exclusive);
-
/**
* Gets the maximum delivery count. If a message on this queue
* is delivered more than maximumDeliveryCount, the message will be
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org