You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/11/29 15:24:39 UTC
svn commit: r599452 -
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
Author: arnaudsimon
Date: Thu Nov 29 06:24:38 2007
New Revision: 599452
URL: http://svn.apache.org/viewvc?rev=599452&view=rev
Log:
added flush
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=599452&r1=599451&r2=599452&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java Thu Nov 29 06:24:38 2007
@@ -31,14 +31,26 @@
// use default size
MAX_NOT_SYNC_DATA_LENGH = 200000000;
}
+ String flush = "message_size_before_flush";
+ try
+ {
+ MAX_NOT_FLUSH_DATA_LENGH = new Long(System.getProperties().getProperty(flush, "2000000"));
+ }
+ catch (NumberFormatException e)
+ {
+ // use default size
+ MAX_NOT_FLUSH_DATA_LENGH = 20000000;
+ }
}
private static long MAX_NOT_SYNC_DATA_LENGH;
+ private static long MAX_NOT_FLUSH_DATA_LENGH;
private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
private ClosedListener _exceptionListner;
private RangeSet _acquiredMessages;
private RangeSet _rejectedMessages;
private long _currentDataSizeNotSynced;
+ private long _currentDataSizeNotFlushed;
public void messageAcknowledge(RangeSet ranges)
@@ -80,6 +92,7 @@
public void data(ByteBuffer buf)
{
_currentDataSizeNotSynced = _currentDataSizeNotSynced + buf.remaining();
+ _currentDataSizeNotFlushed = _currentDataSizeNotFlushed + buf.remaining();
super.data(buf);
}
@@ -122,6 +135,11 @@
if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >= MAX_NOT_SYNC_DATA_LENGH)
{
sync();
+ }
+ if( MAX_NOT_FLUSH_DATA_LENGH != -1 && _currentDataSizeNotFlushed >= MAX_NOT_FLUSH_DATA_LENGH)
+ {
+ executionFlush();
+ _currentDataSizeNotFlushed = 0;
}
}