You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/06/16 23:04:01 UTC
svn commit: r668311 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
AMQConnectionDelegate_0_10.java AMQSession.java AMQSession_0_10.java
AMQSession_0_8.java
Author: rhs
Date: Mon Jun 16 14:04:01 2008
New Revision: 668311
URL: http://svn.apache.org/viewvc?rev=668311&view=rev
Log:
QPID-1139: use RFC1982 comparisons for rollback mark and update rollback mark to track dispatched messages
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=668311&r1=668310&r2=668311&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Mon Jun 16 14:04:01 2008
@@ -63,6 +63,7 @@
}
catch (Exception e)
{
+ _logger.error("exception creating session:", e);
throw new JMSAMQException("cannot create session", e);
}
return session;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=668311&r1=668310&r2=668311&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Jun 16 14:04:01 2008
@@ -2745,6 +2745,8 @@
{
while (!_closed.get() && ((message = (UnprocessedMessage) _queue.take()) != null))
{
+ long deliveryTag = message.getDeliveryTag();
+
synchronized (_lock)
{
@@ -2753,27 +2755,24 @@
_lock.wait();
}
- if (message.getDeliveryTag() <= _rollbackMark.get())
+ if (tagLE(deliveryTag, _rollbackMark.get()))
{
rejectMessage(message, true);
}
else
{
- if (message.getDeliveryTag() <= _rollbackMark.get())
- {
- rejectMessage(message, true);
- }
- else
+ synchronized (_messageDeliveryLock)
{
- synchronized (_messageDeliveryLock)
- {
- dispatchMessage(message);
- }
+ dispatchMessage(message);
}
}
-
}
+ long current = _rollbackMark.get();
+ if (updateRollbackMark(current, deliveryTag))
+ {
+ _rollbackMark.compareAndSet(current, deliveryTag);
+ }
}
}
catch (InterruptedException e)
@@ -2851,6 +2850,10 @@
}
}
+ abstract boolean tagLE(long tag1, long tag2);
+
+ abstract boolean updateRollbackMark(long current, long deliveryTag);
+
/*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write,
boolean read) throws AMQException
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=668311&r1=668310&r2=668311&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Mon Jun 16 14:04:01 2008
@@ -27,6 +27,7 @@
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.util.Serial;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.ErrorCode;
@@ -785,4 +786,15 @@
throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
}
}
+
+ final boolean tagLE(long tag1, long tag2)
+ {
+ return Serial.le((int) tag1, (int) tag2);
+ }
+
+ final boolean updateRollbackMark(long currentMark, long deliveryTag)
+ {
+ return Serial.lt((int) currentMark, (int) deliveryTag);
+ }
+
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=668311&r1=668310&r2=668311&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Mon Jun 16 14:04:01 2008
@@ -40,7 +40,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class AMQSession_0_8 extends AMQSession
+public final class AMQSession_0_8 extends AMQSession
{
/** Used for debugging. */
@@ -453,4 +453,14 @@
return okHandler._messageCount;
}
+ final boolean tagLE(long tag1, long tag2)
+ {
+ return tag1 <= tag2;
+ }
+
+ final boolean updateRollbackMark(long currentMark, long deliveryTag)
+ {
+ return false;
+ }
+
}