You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/02/21 21:42:42 UTC
svn commit: r1731562 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/se...
Author: rgodfrey
Date: Sun Feb 21 20:42:41 2016
New Revision: 1731562
URL: http://svn.apache.org/viewvc?rev=1731562&view=rev
Log:
QPID-6703 : Use connection close lifetime policy for temporary queues where available
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/BindingLoggingTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/QueueLoggingTest.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java Sun Feb 21 20:42:41 2016
@@ -57,14 +57,14 @@ public class QueueArgumentsConverter
public static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
- public static final String QPID_TRACE_EXCLUDE = "qpid.trace.exclude";
- public static final String QPID_TRACE_ID = "qpid.trace.id";
-
public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
public static final String QPID_DEFAULT_FILTERS = "qpid.default_filters";
public static final String QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS = "qpid.ensure_nondestructive_consumers";
+
+ public static final String QPID_EXCLUSIVITY_POLICY = "qpid.exclusivity_policy";
+ public static final String QPID_LIFETIME_POLICY = "qpid.lifetime_policy";
/**
* No-local queue argument is used to support the no-local feature of Durable Subscribers.
*/
@@ -99,7 +99,6 @@ public class QueueArgumentsConverter
ATTRIBUTE_MAPPINGS.put(X_QPID_DLQ_ENABLED, AbstractVirtualHost.CREATE_DLQ_ON_CREATION);
ATTRIBUTE_MAPPINGS.put(QPID_GROUP_HEADER_KEY, Queue.MESSAGE_GROUP_KEY);
- //ATTRIBUTE_MAPPINGS.put(QPID_SHARED_MSG_GROUP, Queue.MESSAGE_GROUP_SHARED_GROUPS);
ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_MESSAGE_GROUP_ARG, Queue.MESSAGE_GROUP_DEFAULT_GROUP);
ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL);
@@ -107,6 +106,9 @@ public class QueueArgumentsConverter
ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_FILTERS, Queue.DEFAULT_FILTERS);
ATTRIBUTE_MAPPINGS.put(QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS, Queue.ENSURE_NONDESTRUCTIVE_CONSUMERS);
+ ATTRIBUTE_MAPPINGS.put(QPID_EXCLUSIVITY_POLICY, Queue.EXCLUSIVE);
+ ATTRIBUTE_MAPPINGS.put(QPID_LIFETIME_POLICY, Queue.LIFETIME_POLICY);
+
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Sun Feb 21 20:42:41 2016
@@ -153,6 +153,7 @@ public class ServerConnectionDelegate ex
map.put(ServerPropertyNames.QPID_INSTANCE_NAME, broker.getName());
map.put(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, String.valueOf(broker.isMessageCompressionEnabled()));
map.put(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED, String.valueOf(broker.isVirtualHostPropertiesNodeEnabled()));
+ map.put(ConnectionStartProperties.QPID_QUEUE_LIFETIME_SUPPORTED, Boolean.TRUE.toString());
return map;
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Sun Feb 21 20:42:41 2016
@@ -1455,25 +1455,32 @@ public class ServerSessionDelegate exten
arguments.put(Queue.ID, id);
arguments.put(Queue.NAME, queueName);
- LifetimePolicy lifetime;
- if(autoDelete)
- {
- lifetime = exclusive ? LifetimePolicy.DELETE_ON_SESSION_END
- : LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS;
- }
- else
+
+ if(!arguments.containsKey(Queue.LIFETIME_POLICY))
{
- lifetime = LifetimePolicy.PERMANENT;
+ LifetimePolicy lifetime;
+ if(autoDelete)
+ {
+ lifetime = exclusive ? LifetimePolicy.DELETE_ON_SESSION_END
+ : LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS;
+ }
+ else
+ {
+ lifetime = LifetimePolicy.PERMANENT;
+ }
+ arguments.put(Queue.LIFETIME_POLICY, lifetime);
}
- arguments.put(Queue.LIFETIME_POLICY, lifetime);
-
- ExclusivityPolicy exclusivityPolicy = exclusive ? ExclusivityPolicy.SESSION : ExclusivityPolicy.NONE;
+ if(!arguments.containsKey(Queue.EXCLUSIVE))
+ {
+ ExclusivityPolicy exclusivityPolicy =
+ exclusive ? ExclusivityPolicy.SESSION : ExclusivityPolicy.NONE;
+ arguments.put(Queue.EXCLUSIVE, exclusivityPolicy);
+ }
arguments.put(Queue.DURABLE, method.getDurable());
- arguments.put(Queue.EXCLUSIVE, exclusivityPolicy);
queue = virtualHost.createChild(Queue.class, arguments);
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sun Feb 21 20:42:41 2016
@@ -3306,9 +3306,14 @@ public class AMQChannel
exclusivityPolicy = ExclusivityPolicy.NONE;
}
- attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
- attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
-
+ if(!attributes.containsKey(Queue.EXCLUSIVE))
+ {
+ attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+ }
+ if(!attributes.containsKey(Queue.LIFETIME_POLICY))
+ {
+ attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
+ }
queue = virtualHost.createChild(Queue.class, attributes);
@@ -3355,7 +3360,7 @@ public class AMQChannel
+ ")");
}
else if ((autoDelete
- && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
+ && queue.getLifetimePolicy() == LifetimePolicy.PERMANENT)
|| (!autoDelete && queue.getLifetimePolicy() != ((exclusive
&& !durable)
? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Sun Feb 21 20:42:41 2016
@@ -363,7 +363,7 @@ public class AMQPConnection_0_8
String.valueOf(getBroker().isMessageCompressionEnabled()));
serverProperties.setString(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED, Boolean.TRUE.toString());
serverProperties.setString(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED, String.valueOf(getBroker().isVirtualHostPropertiesNodeEnabled()));
-
+ serverProperties.setString(ConnectionStartProperties.QPID_QUEUE_LIFETIME_SUPPORTED, Boolean.TRUE.toString());
AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
(short) pv.getActualMinorVersion(),
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Sun Feb 21 20:42:41 2016
@@ -85,5 +85,7 @@ public interface AMQConnectionDelegate
boolean isVirtualHostPropertiesSupported();
+ boolean isQueueLifetimePolicySupported();
+
}
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Sun Feb 21 20:42:41 2016
@@ -619,6 +619,12 @@ public class AMQConnectionDelegate_0_10
}
@Override
+ public boolean isQueueLifetimePolicySupported()
+ {
+ return _qpidConnection.isQueueLifetimePolicySupported();
+ }
+
+ @Override
public void setMaxFrameSize(final int frameSize)
{
_conn.setMaximumFrameSize(frameSize);
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Sun Feb 21 20:42:41 2016
@@ -79,6 +79,7 @@ public class AMQConnectionDelegate_8_0 i
private boolean _confirmedPublishSupported;
private boolean _confirmedPublishNonTransactionalSupported;
private boolean _virtualhostPropertiesSupported;
+ private boolean _queueLifetimeSupported;
public void closeConnection(long timeout) throws JMSException, QpidException
{
@@ -172,7 +173,8 @@ public class AMQConnectionDelegate_8_0 i
_virtualhostPropertiesSupported =
checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED);
-
+ _queueLifetimeSupported =
+ checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_QUEUE_LIFETIME_SUPPORTED);
_confirmedPublishSupported =
checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED);
_confirmedPublishNonTransactionalSupported = checkConfirmedPublishNonTransactionalSupported();
@@ -515,6 +517,11 @@ public class AMQConnectionDelegate_8_0 i
return _virtualhostPropertiesSupported;
}
+ @Override
+ public boolean isQueueLifetimePolicySupported()
+ {
+ return _queueLifetimeSupported;
+ }
public boolean isAddrSyntaxSupported()
{
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Sun Feb 21 20:42:41 2016
@@ -1493,8 +1493,18 @@ public abstract class AMQSession<C exten
// this is done so that we can produce to a temporary queue before we create a consumer
result.setQueueName(result.getRoutingKey());
+ Map<String, Object> args;
+ if(_connection.getDelegate().isQueueLifetimePolicySupported())
+ {
+ args = Collections.<String,Object>singletonMap("qpid.lifetime_policy", "DELETE_ON_CONNECTION_CLOSE");
+ }
+ else
+ {
+ args = null;
+ }
createQueue(result.getAMQQueueName(), result.isAutoDelete(),
- result.isDurable(), result.isExclusive());
+ result.isDurable(), result.isExclusive(),
+ args);
bindQueue(result.getAMQQueueName(), result.getRoutingKey(),
new HashMap<String, Object>(), result.getExchangeName(), result);
return result;
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java Sun Feb 21 20:42:41 2016
@@ -62,6 +62,8 @@ public class ConnectionStartProperties
public static final String QPID_CONFIRMED_PUBLISH_SUPPORTED = "qpid.confirmed_publish_supported";
+ public static final String QPID_QUEUE_LIFETIME_SUPPORTED = "qpid.queue_lifetime_supported";
+
public static final int _pid;
public static final String _platformInfo;
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java Sun Feb 21 20:42:41 2016
@@ -81,6 +81,7 @@ public class Connection extends Connecti
private boolean _messageCompressionSupported;
private final AtomicBoolean _redirecting = new AtomicBoolean();
private boolean _virtualHostPropertiesSupported;
+ private boolean _queueLifetimePolicySupported;
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
@@ -686,6 +687,7 @@ public class Connection extends Connecti
_serverProperties = serverProperties == null ? Collections.<String, Object>emptyMap() : serverProperties;
_messageCompressionSupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED)));
_virtualHostPropertiesSupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED)));
+ _queueLifetimePolicySupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_QUEUE_LIFETIME_SUPPORTED)));
}
@@ -851,6 +853,11 @@ public class Connection extends Connecti
return _virtualHostPropertiesSupported;
}
+ public boolean isQueueLifetimePolicySupported()
+ {
+ return _queueLifetimePolicySupported;
+ }
+
public boolean isRedirecting()
{
return _redirecting.get();
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/BindingLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/BindingLoggingTest.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/BindingLoggingTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/BindingLoggingTest.java Sun Feb 21 20:42:41 2016
@@ -174,13 +174,9 @@ public class BindingLoggingTest extends
{
//Closing a consumer on a temporary queue will cause it to autodelete
// and so unbind.
- _session.createConsumer(_session.createTemporaryQueue()).close();
+ _session.createConsumer(_session.createTemporaryQueue());
- if(isBroker010())
- {
- //auto-delete is at session close for 0-10
- _session.close();
- }
+ _connection.close();
//wait for the deletion messages to be logged
waitForMessage("BND-1002");
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/QueueLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/QueueLoggingTest.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/QueueLoggingTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/QueueLoggingTest.java Sun Feb 21 20:42:41 2016
@@ -146,13 +146,9 @@ public class QueueLoggingTest extends Ab
{
// Create a temporary queue so that when we consume from it and
// then close the consumer it will be autoDeleted.
- _session.createConsumer(_session.createTemporaryQueue()).close();
+ _session.createConsumer(_session.createTemporaryQueue());
- if(isBroker010())
- {
- //auto-delete is at session close for 0-10
- _session.close();
- }
+ _connection.close();
// Validation
//Ensure that we wait for the QUE log message
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org