You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/11/29 15:24:39 UTC

svn commit: r599452 - /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java

Author: arnaudsimon
Date: Thu Nov 29 06:24:38 2007
New Revision: 599452

URL: http://svn.apache.org/viewvc?rev=599452&view=rev
Log:
added flush

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=599452&r1=599451&r2=599452&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java Thu Nov 29 06:24:38 2007
@@ -31,14 +31,26 @@
                 // use default size
                 MAX_NOT_SYNC_DATA_LENGH = 200000000;
             }
+            String flush = "message_size_before_flush";
+            try
+            {
+                MAX_NOT_FLUSH_DATA_LENGH = new Long(System.getProperties().getProperty(flush, "2000000"));
+            }
+            catch (NumberFormatException e)
+            {
+                // use default size
+                MAX_NOT_FLUSH_DATA_LENGH = 20000000;
+            }
     }
 
     private static  long MAX_NOT_SYNC_DATA_LENGH;
+     private static  long MAX_NOT_FLUSH_DATA_LENGH;
     private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
     private ClosedListener _exceptionListner;
     private RangeSet _acquiredMessages;
     private RangeSet _rejectedMessages;
     private long _currentDataSizeNotSynced;
+    private long _currentDataSizeNotFlushed;
 
 
     public void messageAcknowledge(RangeSet ranges)
@@ -80,6 +92,7 @@
     public void data(ByteBuffer buf)
     {
         _currentDataSizeNotSynced = _currentDataSizeNotSynced + buf.remaining();
+        _currentDataSizeNotFlushed = _currentDataSizeNotFlushed + buf.remaining();
         super.data(buf);
     }
 
@@ -122,6 +135,11 @@
         if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >= MAX_NOT_SYNC_DATA_LENGH)
         {
             sync();
+        }
+         if( MAX_NOT_FLUSH_DATA_LENGH != -1 && _currentDataSizeNotFlushed >= MAX_NOT_FLUSH_DATA_LENGH)
+        {
+           executionFlush();
+            _currentDataSizeNotFlushed = 0;
         }
     }