You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/06/06 05:39:03 UTC

svn commit: r663813 - in /incubator/qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpidity/nclient/ common/src/main/java/org/apache/qpidity/transport/ common/src/main/java/org/apache/qpidity/transport...

Author: rhs
Date: Thu Jun  5 20:39:03 2008
New Revision: 663813

URL: http://svn.apache.org/viewvc?rev=663813&view=rev
Log:
QPID-1062: merge writes of separate frames within an assembly, use sync flag instead of sync command on message transfer

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=663813&r1=663812&r2=663813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Thu Jun  5 20:39:03 2008
@@ -206,16 +206,24 @@
         // send the message
         try
         {
-            ((AMQSession_0_10) getSession()).getQpidSession().messageTransfer(destination.getExchangeName().toString(),
-                    message.get010Message(),
-                    org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
-                    org.apache.qpidity.nclient.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
-            if(deliveryMode == DeliveryMode.PERSISTENT && getSession().getAMQConnection().getSyncPersistence())
+            org.apache.qpidity.nclient.Session ssn = ((AMQSession_0_10) getSession()).getQpidSession();
+
+            // if true, we need to sync the delivery of this message
+            boolean sync = (deliveryMode == DeliveryMode.PERSISTENT &&
+                            getSession().getAMQConnection().getSyncPersistence());
+
+            if(sync)
             {
-                // we need to sync the delivery of this message
-                ((AMQSession_0_10) getSession()).getQpidSession().sync();
+                ssn.setAutoSync(true);
+            }
+            ssn.messageTransfer(destination.getExchangeName().toString(),
+                                message.get010Message(),
+                                ssn.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+                                ssn.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+            if (sync)
+            {
+                ssn.setAutoSync(false);
             }
-
         }
         catch (IOException e)
         {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java?rev=663813&r1=663812&r2=663813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java Thu Jun  5 20:39:03 2008
@@ -71,6 +71,8 @@
 
     public byte[] getName();
 
+    public void setAutoSync(boolean value);
+
     //------------------------------------------------------
     //                 Messaging methods
     //                   Producer

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=663813&r1=663812&r2=663813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java Thu Jun  5 20:39:03 2008
@@ -65,6 +65,7 @@
 
     private byte[] name;
     private long timeout = 60000;
+    private boolean autoSync = false;
 
     // channel may be null
     Channel channel;
@@ -80,6 +81,7 @@
     private int commandsOut = 0;
     private Map<Integer,Method> commands = new HashMap<Integer,Method>();
     private int maxComplete = commandsOut - 1;
+    private boolean needSync = false;
 
     private AtomicBoolean closed = new AtomicBoolean(false);
 
@@ -93,6 +95,14 @@
         return name;
     }
 
+    public void setAutoSync(boolean value)
+    {
+        synchronized (commands)
+        {
+            this.autoSync = value;
+        }
+    }
+
     public Map<Integer,Method> getOutstandingCommands()
     {
         return commands;
@@ -242,7 +252,16 @@
                 {
                     commands.put(next, m);
                 }
+                if (autoSync)
+                {
+                    m.setSync(true);
+                }
+                needSync = !m.isSync();
                 channel.method(m);
+                if (autoSync && !m.hasPayload())
+                {
+                    sync();
+                }
             }
         }
         else
@@ -286,6 +305,13 @@
     public void endData()
     {
         channel.end();
+        synchronized (commands)
+        {
+            if (autoSync)
+            {
+                sync();
+            }
+        }
     }
 
     public void sync()
@@ -300,7 +326,7 @@
         {
             int point = commandsOut - 1;
 
-            if (lt(maxComplete, point))
+            if (needSync && lt(maxComplete, point))
             {
                 ExecutionSync sync = new ExecutionSync();
                 sync.setSync(true);

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java?rev=663813&r1=663812&r2=663813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java Thu Jun  5 20:39:03 2008
@@ -22,6 +22,9 @@
 
 import java.nio.ByteBuffer;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.qpidity.transport.Constant;
 import org.apache.qpidity.transport.ProtocolError;
 import org.apache.qpidity.transport.ProtocolHeader;
@@ -40,6 +43,8 @@
 
     private Sender<ByteBuffer> sender;
     private Object lock = new Object();
+    private int bytes = 0;
+    private List<Frame> frames = new ArrayList<Frame>();
 
     public OutputHandler(Sender<ByteBuffer> sender)
     {
@@ -69,24 +74,37 @@
 
     public void frame(Frame frame)
     {
-        ByteBuffer buf = ByteBuffer.allocate(HEADER_SIZE + frame.getSize());
-        buf.put(frame.getFlags());
-        buf.put((byte) frame.getType().getValue());
-        buf.putShort((short) (frame.getSize() + HEADER_SIZE));
-        // RESERVED
-        buf.put(RESERVED);
-        buf.put(frame.getTrack());
-        buf.putShort((short) frame.getChannel());
-        // RESERVED
-        buf.putInt(0);
-        for(ByteBuffer frg : frame)
-        {
-            buf.put(frg);
-        }
-        buf.flip();
         synchronized (lock)
         {
-            sender.send(buf);
+            frames.add(frame);
+            bytes += HEADER_SIZE + frame.getSize();
+
+            if (frame.isLastFrame() && frame.isLastSegment() || bytes > 64*1024)
+            {
+                ByteBuffer buf = ByteBuffer.allocate(bytes);
+                for (Frame f : frames)
+                {
+                    buf.put(f.getFlags());
+                    buf.put((byte) f.getType().getValue());
+                    buf.putShort((short) (f.getSize() + HEADER_SIZE));
+                    // RESERVED
+                    buf.put(RESERVED);
+                    buf.put(f.getTrack());
+                    buf.putShort((short) f.getChannel());
+                    // RESERVED
+                    buf.putInt(0);
+                    for(ByteBuffer frg : f)
+                    {
+                        buf.put(frg);
+                    }
+                }
+                buf.flip();
+
+                frames.clear();
+                bytes = 0;
+
+                sender.send(buf);
+            }
         }
     }