You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/26 18:08:40 UTC

svn commit: r532788 - in /incubator/qpid/trunk/qpid: ./ java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Author: ritchiem
Date: Thu Apr 26 09:08:39 2007
New Revision: 532788

URL: http://svn.apache.org/viewvc?view=rev&rev=532788
Log:
Merged revisions 532786 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2

........
  r532786 | ritchiem | 2007-04-26 16:59:24 +0100 (Thu, 26 Apr 2007) | 3 lines
  
  QPID-466 Create STRICT_AMQP System property to disable JMS extensions in Java client. 
  
  This disables the JMS features that rely upon Qpid Java broker specific features.
........

Modified:
    incubator/qpid/trunk/qpid/   (props changed)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=532788&r1=532787&r2=532788
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Apr 26 09:08:39 2007
@@ -202,11 +202,20 @@
     /** Boolean to control immediate prefetch . Records the first call to the dispatcher to prevent further flow(true) */
     private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
 
+    /** System property to enable strickt AMQP compliance */
+    public static final String STRICT_AMQP = "STRICT_AMQP";
+    /** Strickt AMQP default */
+    public static final String STRICT_AMQP_DEFAULT = "false";
+
+    private final boolean _strictAMQP;
+
+
     /** System property to enable immediate message prefetching */
     public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
     /** Immediate message prefetch default */
     public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
 
+    private final boolean _immediatePrefetch;
 
     private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
 
@@ -435,6 +444,10 @@
     AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
                MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
     {
+
+        _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
+        _immediatePrefetch = Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
+
         _connection = con;
         _transacted = transacted;
         if (transacted)
@@ -921,15 +934,27 @@
                 _dispatcher.rollback();
             }
 
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
-                                                                                       getProtocolMajorVersion(),
-                                                                                       getProtocolMinorVersion(),
-                                                                                       false)    // requeue
-                    , BasicRecoverOkBody.class);
+            if (isStrictAMQP())
+            {
+                // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
+                _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+                                                                                            getProtocolMajorVersion(),
+                                                                                            getProtocolMinorVersion(),
+                                                                                            false));    // requeue
+                _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");                
+            }
+            else
+            {
 
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
+                                                                                           getProtocolMajorVersion(),
+                                                                                           getProtocolMinorVersion(),
+                                                                                           false)    // requeue
+                        , BasicRecoverOkBody.class);
+            }
             if (!isSuspended)
             {
                 suspendChannel(false);
@@ -1433,7 +1458,6 @@
     private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
                                   boolean nowait, String messageSelector) throws AMQException
     {
-        //fixme prefetch values are not used here. Do we need to have them as parametsrs?
         //need to generate a consumer tag on the client so we can exploit the nowait flag
         AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
 
@@ -1709,11 +1733,21 @@
 
     public QueueBrowser createBrowser(Queue queue) throws JMSException
     {
+        if (isStrictAMQP())
+        {
+            throw new UnsupportedOperationException();
+        }
+
         return createBrowser(queue, null);
     }
 
     public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
     {
+        if (isStrictAMQP())
+        {
+            throw new UnsupportedOperationException();
+        }
+
         checkNotClosed();
         checkValidQueue(queue);
         return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
@@ -1762,6 +1796,11 @@
 
     boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException
     {
+        if (isStrictAMQP())
+        {
+            throw new UnsupportedOperationException();
+        }
+
         // TODO: Be aware of possible changes to parameter order as versions change.
         AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
                                                                getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
@@ -1940,7 +1979,7 @@
     synchronized void startDistpatcherIfNecessary()
     {
         // If IMMEDIATE_PREFETCH is not set then we need to start fetching          
-        if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)))
+        if (!_immediatePrefetch)
         {
             // We do this now if this is the first call on a started connection
             if (isSuspended() && _startedAtLeastOnce.get() && _firstDispatcher.getAndSet(false))
@@ -2005,7 +2044,7 @@
         bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
 
         // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
-        if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)))
+        if (!_immediatePrefetch)
         {
             // The dispatcher will be null if we have just created this session
             // so suspend the channel before we register our consumer so that we don't
@@ -2390,6 +2429,11 @@
 
             _connection.getProtocolHandler().writeFrame(basicRejectBody);
         }
+    }
+
+    public boolean isStrictAMQP()
+    {
+        return _strictAMQP;
     }
 
 }