You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/11/09 14:16:29 UTC

svn commit: r593519 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache: qpid/jndi/PropertiesFileInitialContextFactory.java qpidity/nclient/impl/ClientSession.java

Author: arnaudsimon
Date: Fri Nov  9 05:16:28 2007
New Revision: 593519

URL: http://svn.apache.org/viewvc?rev=593519&view=rev
Log:
added property for setting max message size before a sync

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=593519&r1=593518&r2=593519&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Fri Nov  9 05:16:28 2007
@@ -86,6 +86,7 @@
                 p.load(new BufferedInputStream(new FileInputStream(file)));
 
                 environment.putAll(p);
+                System.getProperties().putAll(p);
                 _logger.info("Loaded Context Properties:" + environment.toString());
             }
             else
@@ -97,6 +98,8 @@
         {
             _logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL));
         }
+
+        
 
         createConnectionFactories(data, environment);
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=593519&r1=593518&r2=593519&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java Fri Nov  9 05:16:28 2007
@@ -4,6 +4,8 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
+import java.nio.ByteBuffer;
 
 import org.apache.qpidity.transport.Option;
 import org.apache.qpidity.QpidException;
@@ -18,11 +20,31 @@
  */
 public class ClientSession extends org.apache.qpidity.transport.Session implements  org.apache.qpidity.nclient.DtxSession
 {
+    static
+    {
+        MAX_NOT_SYNC_DATA_LENGH = 200000 * 1024;
+        String max = "message_size_before_sync";       
+        if (System.getProperties().containsKey(max))
+        {
+            try
+            {
+                MAX_NOT_SYNC_DATA_LENGH = new Long(System.getProperties().getProperty(max));
+            }
+            catch (NumberFormatException e)
+            {
+                // use default size
+            }
+        }
+    }
+
+    private static  long MAX_NOT_SYNC_DATA_LENGH  ;
     private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
     private ClosedListener _exceptionListner;
     private RangeSet _acquiredMessages;
     private RangeSet _rejectedMessages;
-        
+    private long _dataSentNotSync;
+
+
     public void messageAcknowledge(RangeSet ranges)
     {
         for (Range range : ranges)
@@ -42,12 +64,24 @@
     {
         // The javadoc clearly says that this method is suitable for small messages
         // therefore reading the content in one shot.
+        ByteBuffer  data = msg.readData();
+        _dataSentNotSync = _dataSentNotSync + msg.getMessageProperties().getContentLength() + data.limit();
         super.messageTransfer(destination, confirmMode, acquireMode);
         super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
-        super.data(msg.readData());
-        super.endData();        
+        super.data( data );
+        super.endData();
+        if( _dataSentNotSync >= MAX_NOT_SYNC_DATA_LENGH)
+        {
+            sync();
+        }
+    }
+
+    public void sync()
+    {
+        _dataSentNotSync = 0;
+        super.sync();
     }
-    
+
     public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException
     {
         super.messageTransfer(destination, confirmMode, acquireMode);