You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/02/21 21:42:42 UTC

svn commit: r1731562 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/queue/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/se...

Author: rgodfrey
Date: Sun Feb 21 20:42:41 2016
New Revision: 1731562

URL: http://svn.apache.org/viewvc?rev=1731562&view=rev
Log:
QPID-6703 : Use connection close lifetime policy for temporary queues where available

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/BindingLoggingTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/QueueLoggingTest.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java Sun Feb 21 20:42:41 2016
@@ -57,14 +57,14 @@ public class QueueArgumentsConverter
 
     public static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
 
-    public static final String QPID_TRACE_EXCLUDE = "qpid.trace.exclude";
-    public static final String QPID_TRACE_ID = "qpid.trace.id";
-
     public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
 
     public static final String QPID_DEFAULT_FILTERS = "qpid.default_filters";
 
     public static final String QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS = "qpid.ensure_nondestructive_consumers";
+
+    public static final String QPID_EXCLUSIVITY_POLICY = "qpid.exclusivity_policy";
+    public static final String QPID_LIFETIME_POLICY = "qpid.lifetime_policy";
     /**
      * No-local queue argument is used to support the no-local feature of Durable Subscribers.
      */
@@ -99,7 +99,6 @@ public class QueueArgumentsConverter
 
         ATTRIBUTE_MAPPINGS.put(X_QPID_DLQ_ENABLED, AbstractVirtualHost.CREATE_DLQ_ON_CREATION);
         ATTRIBUTE_MAPPINGS.put(QPID_GROUP_HEADER_KEY, Queue.MESSAGE_GROUP_KEY);
-        //ATTRIBUTE_MAPPINGS.put(QPID_SHARED_MSG_GROUP, Queue.MESSAGE_GROUP_SHARED_GROUPS);
         ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_MESSAGE_GROUP_ARG, Queue.MESSAGE_GROUP_DEFAULT_GROUP);
 
         ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL);
@@ -107,6 +106,9 @@ public class QueueArgumentsConverter
         ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_FILTERS, Queue.DEFAULT_FILTERS);
         ATTRIBUTE_MAPPINGS.put(QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS, Queue.ENSURE_NONDESTRUCTIVE_CONSUMERS);
 
+        ATTRIBUTE_MAPPINGS.put(QPID_EXCLUSIVITY_POLICY, Queue.EXCLUSIVE);
+        ATTRIBUTE_MAPPINGS.put(QPID_LIFETIME_POLICY, Queue.LIFETIME_POLICY);
+
     }
 
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Sun Feb 21 20:42:41 2016
@@ -153,6 +153,7 @@ public class ServerConnectionDelegate ex
         map.put(ServerPropertyNames.QPID_INSTANCE_NAME, broker.getName());
         map.put(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, String.valueOf(broker.isMessageCompressionEnabled()));
         map.put(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED, String.valueOf(broker.isVirtualHostPropertiesNodeEnabled()));
+        map.put(ConnectionStartProperties.QPID_QUEUE_LIFETIME_SUPPORTED, Boolean.TRUE.toString());
 
         return map;
     }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Sun Feb 21 20:42:41 2016
@@ -1455,25 +1455,32 @@ public class ServerSessionDelegate exten
                 arguments.put(Queue.ID, id);
                 arguments.put(Queue.NAME, queueName);
 
-                LifetimePolicy lifetime;
-                if(autoDelete)
-                {
-                    lifetime = exclusive ? LifetimePolicy.DELETE_ON_SESSION_END
-                            : LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS;
-                }
-                else
+
+                if(!arguments.containsKey(Queue.LIFETIME_POLICY))
                 {
-                    lifetime = LifetimePolicy.PERMANENT;
+                    LifetimePolicy lifetime;
+                    if(autoDelete)
+                    {
+                        lifetime = exclusive ? LifetimePolicy.DELETE_ON_SESSION_END
+                                : LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS;
+                    }
+                    else
+                    {
+                        lifetime = LifetimePolicy.PERMANENT;
+                    }
+                    arguments.put(Queue.LIFETIME_POLICY, lifetime);
                 }
 
-                arguments.put(Queue.LIFETIME_POLICY, lifetime);
-
-                ExclusivityPolicy exclusivityPolicy = exclusive ? ExclusivityPolicy.SESSION : ExclusivityPolicy.NONE;
+                if(!arguments.containsKey(Queue.EXCLUSIVE))
+                {
+                    ExclusivityPolicy exclusivityPolicy =
+                            exclusive ? ExclusivityPolicy.SESSION : ExclusivityPolicy.NONE;
 
+                    arguments.put(Queue.EXCLUSIVE, exclusivityPolicy);
+                }
 
                 arguments.put(Queue.DURABLE, method.getDurable());
 
-                arguments.put(Queue.EXCLUSIVE, exclusivityPolicy);
 
                 queue = virtualHost.createChild(Queue.class, arguments);
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sun Feb 21 20:42:41 2016
@@ -3306,9 +3306,14 @@ public class AMQChannel
                     exclusivityPolicy = ExclusivityPolicy.NONE;
                 }
 
-                attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
-                attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
-
+                if(!attributes.containsKey(Queue.EXCLUSIVE))
+                {
+                    attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+                }
+                if(!attributes.containsKey(Queue.LIFETIME_POLICY))
+                {
+                    attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
+                }
 
                 queue = virtualHost.createChild(Queue.class, attributes);
 
@@ -3355,7 +3360,7 @@ public class AMQChannel
                                                          + ")");
                 }
                 else if ((autoDelete
-                          && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
+                          && queue.getLifetimePolicy() == LifetimePolicy.PERMANENT)
                          || (!autoDelete && queue.getLifetimePolicy() != ((exclusive
                                                                            && !durable)
                         ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Sun Feb 21 20:42:41 2016
@@ -363,7 +363,7 @@ public class AMQPConnection_0_8
                                        String.valueOf(getBroker().isMessageCompressionEnabled()));
             serverProperties.setString(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED, Boolean.TRUE.toString());
             serverProperties.setString(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED, String.valueOf(getBroker().isVirtualHostPropertiesNodeEnabled()));
-
+            serverProperties.setString(ConnectionStartProperties.QPID_QUEUE_LIFETIME_SUPPORTED, Boolean.TRUE.toString());
 
             AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
                                                                                        (short) pv.getActualMinorVersion(),

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Sun Feb 21 20:42:41 2016
@@ -85,5 +85,7 @@ public interface AMQConnectionDelegate
 
     boolean isVirtualHostPropertiesSupported();
 
+    boolean isQueueLifetimePolicySupported();
+
 
 }

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Sun Feb 21 20:42:41 2016
@@ -619,6 +619,12 @@ public class AMQConnectionDelegate_0_10
     }
 
     @Override
+    public boolean isQueueLifetimePolicySupported()
+    {
+        return _qpidConnection.isQueueLifetimePolicySupported();
+    }
+
+    @Override
     public void setMaxFrameSize(final int frameSize)
     {
         _conn.setMaximumFrameSize(frameSize);

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Sun Feb 21 20:42:41 2016
@@ -79,6 +79,7 @@ public class AMQConnectionDelegate_8_0 i
     private boolean _confirmedPublishSupported;
     private boolean _confirmedPublishNonTransactionalSupported;
     private boolean _virtualhostPropertiesSupported;
+    private boolean _queueLifetimeSupported;
 
     public void closeConnection(long timeout) throws JMSException, QpidException
     {
@@ -172,7 +173,8 @@ public class AMQConnectionDelegate_8_0 i
 
                 _virtualhostPropertiesSupported =
                         checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED);
-
+                _queueLifetimeSupported =
+                        checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_QUEUE_LIFETIME_SUPPORTED);
                 _confirmedPublishSupported =
                         checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED);
                 _confirmedPublishNonTransactionalSupported = checkConfirmedPublishNonTransactionalSupported();
@@ -515,6 +517,11 @@ public class AMQConnectionDelegate_8_0 i
         return _virtualhostPropertiesSupported;
     }
 
+    @Override
+    public boolean isQueueLifetimePolicySupported()
+    {
+        return _queueLifetimeSupported;
+    }
 
     public boolean isAddrSyntaxSupported()
     {

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Sun Feb 21 20:42:41 2016
@@ -1493,8 +1493,18 @@ public abstract class AMQSession<C exten
 
             // this is done so that we can produce to a temporary queue before we create a consumer
             result.setQueueName(result.getRoutingKey());
+            Map<String, Object> args;
+            if(_connection.getDelegate().isQueueLifetimePolicySupported())
+            {
+                args = Collections.<String,Object>singletonMap("qpid.lifetime_policy", "DELETE_ON_CONNECTION_CLOSE");
+            }
+            else
+            {
+                args = null;
+            }
             createQueue(result.getAMQQueueName(), result.isAutoDelete(),
-                        result.isDurable(), result.isExclusive());
+                        result.isDurable(), result.isExclusive(),
+                        args);
             bindQueue(result.getAMQQueueName(), result.getRoutingKey(),
                     new HashMap<String, Object>(), result.getExchangeName(), result);
             return result;

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java Sun Feb 21 20:42:41 2016
@@ -62,6 +62,8 @@ public class ConnectionStartProperties
 
     public static final String QPID_CONFIRMED_PUBLISH_SUPPORTED = "qpid.confirmed_publish_supported";
 
+    public static final String QPID_QUEUE_LIFETIME_SUPPORTED = "qpid.queue_lifetime_supported";
+
     public static final int _pid;
 
     public static final String _platformInfo;

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java Sun Feb 21 20:42:41 2016
@@ -81,6 +81,7 @@ public class Connection extends Connecti
     private boolean _messageCompressionSupported;
     private final AtomicBoolean _redirecting = new AtomicBoolean();
     private boolean _virtualHostPropertiesSupported;
+    private boolean _queueLifetimePolicySupported;
 
     public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
 
@@ -686,6 +687,7 @@ public class Connection extends Connecti
         _serverProperties = serverProperties == null ? Collections.<String, Object>emptyMap() : serverProperties;
         _messageCompressionSupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED)));
         _virtualHostPropertiesSupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED)));
+        _queueLifetimePolicySupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_QUEUE_LIFETIME_SUPPORTED)));
 
     }
 
@@ -851,6 +853,11 @@ public class Connection extends Connecti
         return _virtualHostPropertiesSupported;
     }
 
+    public boolean isQueueLifetimePolicySupported()
+    {
+        return _queueLifetimePolicySupported;
+    }
+
     public boolean isRedirecting()
     {
         return _redirecting.get();

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/BindingLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/BindingLoggingTest.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/BindingLoggingTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/BindingLoggingTest.java Sun Feb 21 20:42:41 2016
@@ -174,13 +174,9 @@ public class BindingLoggingTest extends
     {
         //Closing a consumer on a temporary queue will cause it to autodelete
         // and so unbind.
-        _session.createConsumer(_session.createTemporaryQueue()).close();
+        _session.createConsumer(_session.createTemporaryQueue());
 
-        if(isBroker010())
-        {
-            //auto-delete is at session close for 0-10
-            _session.close();
-        }
+        _connection.close();
 
         //wait for the deletion messages to be logged
         waitForMessage("BND-1002");

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/QueueLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/QueueLoggingTest.java?rev=1731562&r1=1731561&r2=1731562&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/QueueLoggingTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/QueueLoggingTest.java Sun Feb 21 20:42:41 2016
@@ -146,13 +146,9 @@ public class QueueLoggingTest extends Ab
        {
            // Create a temporary queue so that when we consume from it and
            // then close the consumer it will be autoDeleted.
-           _session.createConsumer(_session.createTemporaryQueue()).close();
+           _session.createConsumer(_session.createTemporaryQueue());
 
-           if(isBroker010())
-           {
-               //auto-delete is at session close for 0-10
-               _session.close();
-           }
+           _connection.close();
            
            // Validation
            //Ensure that we wait for the QUE log message



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org