You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2012/09/13 22:23:21 UTC
svn commit: r1384512 - in /qpid/trunk/qpid/java/client/src:
main/java/org/apache/qpid/client/AMQSession_0_8.java
test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
Author: kwall
Date: Thu Sep 13 20:23:21 2012
New Revision: 1384512
URL: http://svn.apache.org/viewvc?rev=1384512&view=rev
Log:
QPID-4302: 0-8..0-9-1 client should sync after message.acknowledge()
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1384512&r1=1384511&r2=1384512&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Sep 13 20:23:21 2012
@@ -64,6 +64,11 @@ public class AMQSession_0_8 extends AMQS
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
+ public static final String QPID_SYNC_AFTER_CLIENT_ACK = "qpid.sync_after_client.ack";
+
+ private final boolean _syncAfterClientAck =
+ Boolean.parseBoolean(System.getProperty(QPID_SYNC_AFTER_CLIENT_ACK, "true"));
+
/**
* The period to wait while flow controlled before sending a log message confirming that the session is still
* waiting on flow control being revoked
@@ -120,8 +125,9 @@ public class AMQSession_0_8 extends AMQS
return getProtocolHandler().getProtocolVersion();
}
- protected void acknowledgeImpl()
+ protected void acknowledgeImpl() throws JMSException
{
+ boolean syncRequired = false;
while (true)
{
Long tag = getUnacknowledgedMessageTags().poll();
@@ -131,6 +137,19 @@ public class AMQSession_0_8 extends AMQS
}
acknowledgeMessage(tag, false);
+ syncRequired = true;
+ }
+
+ try
+ {
+ if (syncRequired && _syncAfterClientAck)
+ {
+ sync();
+ }
+ }
+ catch (AMQException a)
+ {
+ throw new JMSAMQException("Failed to sync after acknowledge", a);
}
}
@@ -681,7 +700,7 @@ public class AMQSession_0_8 extends AMQS
boolean noLocal,
boolean noWait) throws AMQException
{
- throw new UnsupportedOperationException("The new addressing based sytanx is "
+ throw new UnsupportedOperationException("The new addressing based syntax is "
+ "not supported for AMQP 0-8/0-9 versions");
}
Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=1384512&r1=1384511&r2=1384512&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Thu Sep 13 20:23:21 2012
@@ -189,14 +189,6 @@ public class TestAMQSession extends AMQS
{
}
- public void handleAddressBasedDestination(AMQDestination dest,
- boolean isConsumer,
- boolean noWait) throws AMQException
- {
- throw new UnsupportedOperationException("The new addressing based sytanx is "
- + "not supported for AMQP 0-8/0-9 versions");
- }
-
@Override
protected void flushAcknowledgments()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org