You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/02/11 20:16:48 UTC

svn commit: r743455 - in /qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport: Method.java Session.java

Author: rhs
Date: Wed Feb 11 19:16:48 2009
New Revision: 743455

URL: http://svn.apache.org/viewvc?rev=743455&view=rev
Log:
QPID-1658: added a byte limit for the number of commands in the session replay buffer, and made the buffer length configurable

Modified:
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java?rev=743455&r1=743454&r2=743455&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java Wed Feb 11 19:16:48 2009
@@ -112,6 +112,19 @@
         throw new UnsupportedOperationException();
     }
 
+    public int getBodySize()
+    {
+        ByteBuffer body = getBody();
+        if (body == null)
+        {
+            return 0;
+        }
+        else
+        {
+            return body.remaining();
+        }
+    }
+
     public abstract byte getEncodedTrack();
 
     public abstract <C> void dispatch(C context, MethodDelegate<C> delegate);

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=743455&r1=743454&r2=743455&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Wed Feb 11 19:16:48 2009
@@ -95,7 +95,9 @@
 
     // outgoing command count
     private int commandsOut = 0;
-    private Method[] commands = new Method[64*1024];
+    private Method[] commands = new Method[Integer.getInteger("qpid.session.command_limit", 64*1024)];
+    private int commandBytes = 0;
+    private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024*1024);
     private int maxComplete = commandsOut - 1;
     private boolean needSync = false;
 
@@ -432,7 +434,13 @@
             int old = maxComplete;
             for (int id = max(maxComplete, lower); le(id, upper); id++)
             {
-                commands[mod(id, commands.length)] = null;
+                int idx = mod(id, commands.length);
+                Method m = commands[idx];
+                if (m != null)
+                {
+                    commandBytes -= m.getBodySize();
+                }
+                commands[idx] = null;
             }
             if (le(lower, maxComplete + 1))
             {
@@ -462,7 +470,7 @@
 
     final private boolean isFull(int id)
     {
-        return id - maxComplete >= commands.length;
+        return id - maxComplete >= commands.length || commandBytes >= byteLimit;
     }
 
     public void invoke(Method m)
@@ -542,6 +550,7 @@
                 if (expiry > 0)
                 {
                     commands[mod(next, commands.length)] = m;
+                    commandBytes += m.getBodySize();
                 }
                 if (autoSync)
                 {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org