You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/11/09 14:16:29 UTC
svn commit: r593519 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache:
qpid/jndi/PropertiesFileInitialContextFactory.java
qpidity/nclient/impl/ClientSession.java
Author: arnaudsimon
Date: Fri Nov 9 05:16:28 2007
New Revision: 593519
URL: http://svn.apache.org/viewvc?rev=593519&view=rev
Log:
added property for setting max message size before a sync
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=593519&r1=593518&r2=593519&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Fri Nov 9 05:16:28 2007
@@ -86,6 +86,7 @@
p.load(new BufferedInputStream(new FileInputStream(file)));
environment.putAll(p);
+ System.getProperties().putAll(p);
_logger.info("Loaded Context Properties:" + environment.toString());
}
else
@@ -97,6 +98,8 @@
{
_logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL));
}
+
+
createConnectionFactories(data, environment);
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=593519&r1=593518&r2=593519&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java Fri Nov 9 05:16:28 2007
@@ -4,6 +4,8 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
+import java.nio.ByteBuffer;
import org.apache.qpidity.transport.Option;
import org.apache.qpidity.QpidException;
@@ -18,11 +20,31 @@
*/
public class ClientSession extends org.apache.qpidity.transport.Session implements org.apache.qpidity.nclient.DtxSession
{
+ static
+ {
+ MAX_NOT_SYNC_DATA_LENGH = 200000 * 1024;
+ String max = "message_size_before_sync";
+ if (System.getProperties().containsKey(max))
+ {
+ try
+ {
+ MAX_NOT_SYNC_DATA_LENGH = new Long(System.getProperties().getProperty(max));
+ }
+ catch (NumberFormatException e)
+ {
+ // use default size
+ }
+ }
+ }
+
+ private static long MAX_NOT_SYNC_DATA_LENGH ;
private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
private ClosedListener _exceptionListner;
private RangeSet _acquiredMessages;
private RangeSet _rejectedMessages;
-
+ private long _dataSentNotSync;
+
+
public void messageAcknowledge(RangeSet ranges)
{
for (Range range : ranges)
@@ -42,12 +64,24 @@
{
// The javadoc clearly says that this method is suitable for small messages
// therefore reading the content in one shot.
+ ByteBuffer data = msg.readData();
+ _dataSentNotSync = _dataSentNotSync + msg.getMessageProperties().getContentLength() + data.limit();
super.messageTransfer(destination, confirmMode, acquireMode);
super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
- super.data(msg.readData());
- super.endData();
+ super.data( data );
+ super.endData();
+ if( _dataSentNotSync >= MAX_NOT_SYNC_DATA_LENGH)
+ {
+ sync();
+ }
+ }
+
+ public void sync()
+ {
+ _dataSentNotSync = 0;
+ super.sync();
}
-
+
public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException
{
super.messageTransfer(destination, confirmMode, acquireMode);