You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2012/09/13 22:23:21 UTC

svn commit: r1384512 - in /qpid/trunk/qpid/java/client/src: main/java/org/apache/qpid/client/AMQSession_0_8.java test/java/org/apache/qpid/test/unit/message/TestAMQSession.java

Author: kwall
Date: Thu Sep 13 20:23:21 2012
New Revision: 1384512

URL: http://svn.apache.org/viewvc?rev=1384512&view=rev
Log:
QPID-4302: 0-8..0-9-1 client should sync after message.acknowledge()

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1384512&r1=1384511&r2=1384512&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Sep 13 20:23:21 2012
@@ -64,6 +64,11 @@ public class AMQSession_0_8 extends AMQS
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
 
+    public static final String QPID_SYNC_AFTER_CLIENT_ACK = "qpid.sync_after_client.ack";
+
+    private final boolean _syncAfterClientAck =
+            Boolean.parseBoolean(System.getProperty(QPID_SYNC_AFTER_CLIENT_ACK, "true"));
+
     /**
      * The period to wait while flow controlled before sending a log message confirming that the session is still
      * waiting on flow control being revoked
@@ -120,8 +125,9 @@ public class AMQSession_0_8 extends AMQS
         return getProtocolHandler().getProtocolVersion();
     }
 
-    protected void acknowledgeImpl()
+    protected void acknowledgeImpl() throws JMSException
     {
+        boolean syncRequired = false;
         while (true)
         {
             Long tag = getUnacknowledgedMessageTags().poll();
@@ -131,6 +137,19 @@ public class AMQSession_0_8 extends AMQS
             }
 
             acknowledgeMessage(tag, false);
+            syncRequired = true;
+        }
+
+        try
+        {
+            if (syncRequired && _syncAfterClientAck)
+            {
+                sync();
+            }
+        }
+        catch (AMQException a)
+        {
+            throw new JMSAMQException("Failed to sync after acknowledge", a);
         }
     }
 
@@ -681,7 +700,7 @@ public class AMQSession_0_8 extends AMQS
                                               boolean noLocal,
                                               boolean noWait) throws AMQException
     {
-        throw new UnsupportedOperationException("The new addressing based sytanx is "
+        throw new UnsupportedOperationException("The new addressing based syntax is "
                 + "not supported for AMQP 0-8/0-9 versions");
     }
     

Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=1384512&r1=1384511&r2=1384512&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Thu Sep 13 20:23:21 2012
@@ -189,14 +189,6 @@ public class TestAMQSession extends AMQS
     {
     }
 
-    public void handleAddressBasedDestination(AMQDestination dest, 
-                                              boolean isConsumer, 
-                                              boolean noWait) throws AMQException
-    {
-        throw new UnsupportedOperationException("The new addressing based sytanx is "
-                + "not supported for AMQP 0-8/0-9 versions");
-    }
-
     @Override
     protected void flushAcknowledgments()
     {      



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org