You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/06/16 23:04:01 UTC

svn commit: r668311 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: AMQConnectionDelegate_0_10.java AMQSession.java AMQSession_0_10.java AMQSession_0_8.java

Author: rhs
Date: Mon Jun 16 14:04:01 2008
New Revision: 668311

URL: http://svn.apache.org/viewvc?rev=668311&view=rev
Log:
QPID-1139: use RFC1982 comparisons for rollback mark and update rollback mark to track dispatched messages

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=668311&r1=668310&r2=668311&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Mon Jun 16 14:04:01 2008
@@ -63,6 +63,7 @@
         }
         catch (Exception e)
         {
+            _logger.error("exception creating session:", e);
             throw new JMSAMQException("cannot create session", e);
         }
         return session;

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=668311&r1=668310&r2=668311&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Jun 16 14:04:01 2008
@@ -2745,6 +2745,8 @@
             {
                 while (!_closed.get() && ((message = (UnprocessedMessage) _queue.take()) != null))
                 {
+                    long deliveryTag = message.getDeliveryTag();
+
                     synchronized (_lock)
                     {
 
@@ -2753,27 +2755,24 @@
                             _lock.wait();
                         }
 
-                        if (message.getDeliveryTag() <= _rollbackMark.get())
+                        if (tagLE(deliveryTag, _rollbackMark.get()))
                         {
                             rejectMessage(message, true);
                         }
                         else
                         {
-                            if (message.getDeliveryTag() <= _rollbackMark.get())
-                            {
-                                rejectMessage(message, true);
-                            }
-                            else
+                            synchronized (_messageDeliveryLock)
                             {
-                                synchronized (_messageDeliveryLock)
-                                {
-                                    dispatchMessage(message);
-                                }
+                                dispatchMessage(message);
                             }
                         }
-
                     }
 
+                    long current = _rollbackMark.get();
+                    if (updateRollbackMark(current, deliveryTag))
+                    {
+                        _rollbackMark.compareAndSet(current, deliveryTag);
+                    }
                 }
             }
             catch (InterruptedException e)
@@ -2851,6 +2850,10 @@
         }
     }
 
+    abstract boolean tagLE(long tag1, long tag2);
+
+    abstract boolean updateRollbackMark(long current, long deliveryTag);
+
     /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write,
         boolean read) throws AMQException
     {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=668311&r1=668310&r2=668311&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Mon Jun 16 14:04:01 2008
@@ -27,6 +27,7 @@
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.util.Serial;
 import org.apache.qpidity.nclient.Session;
 import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
 import org.apache.qpidity.ErrorCode;
@@ -785,4 +786,15 @@
             throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
         }
     }
+
+    final boolean tagLE(long tag1, long tag2)
+    {
+        return Serial.le((int) tag1, (int) tag2);
+    }
+
+    final boolean updateRollbackMark(long currentMark, long deliveryTag)
+    {
+        return Serial.lt((int) currentMark, (int) deliveryTag);
+    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=668311&r1=668310&r2=668311&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Mon Jun 16 14:04:01 2008
@@ -40,7 +40,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class AMQSession_0_8 extends AMQSession
+public final class AMQSession_0_8 extends AMQSession
 {
 
     /** Used for debugging. */
@@ -453,4 +453,14 @@
         return okHandler._messageCount;
     }
 
+    final boolean tagLE(long tag1, long tag2)
+    {
+        return tag1 <= tag2;
+    }
+
+    final boolean updateRollbackMark(long currentMark, long deliveryTag)
+    {
+        return false;
+    }
+
 }