You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/07/09 15:26:56 UTC

svn commit: r675165 [2/2] - in /incubator/qpid/trunk/qpid/java: client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/ client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpidity/nclient/ client/src/main/java/...

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java Wed Jul  9 06:26:54 2008
@@ -69,6 +69,7 @@
         synchronized (lock)
         {
             sender.send(header.toByteBuffer());
+            sender.flush();
         }
     }
 
@@ -79,32 +80,40 @@
             frames.add(frame);
             bytes += HEADER_SIZE + frame.getSize();
 
-            if (frame.isLastFrame() && frame.isLastSegment() || bytes > 64*1024)
+            if (bytes > 64*1024)
             {
-                ByteBuffer buf = ByteBuffer.allocate(bytes);
-                for (Frame f : frames)
-                {
-                    buf.put(f.getFlags());
-                    buf.put((byte) f.getType().getValue());
-                    buf.putShort((short) (f.getSize() + HEADER_SIZE));
-                    // RESERVED
-                    buf.put(RESERVED);
-                    buf.put(f.getTrack());
-                    buf.putShort((short) f.getChannel());
-                    // RESERVED
-                    buf.putInt(0);
-                    for(ByteBuffer frg : f)
-                    {
-                        buf.put(frg);
-                    }
-                }
-                buf.flip();
-
-                frames.clear();
-                bytes = 0;
+                flush();
+            }
+        }
+    }
 
-                sender.send(buf);
+    public void flush()
+    {
+        synchronized (lock)
+        {
+            ByteBuffer buf = ByteBuffer.allocate(bytes);
+            int nframes = frames.size();
+            for (int i = 0; i < nframes; i++)
+            {
+                Frame frame = frames.get(i);
+                buf.put(frame.getFlags());
+                buf.put((byte) frame.getType().getValue());
+                buf.putShort((short) (frame.getSize() + HEADER_SIZE));
+                // RESERVED
+                buf.put(RESERVED);
+                buf.put(frame.getTrack());
+                buf.putShort((short) frame.getChannel());
+                // RESERVED
+                buf.putInt(0);
+                buf.put(frame.getBody());
             }
+            buf.flip();
+
+            frames.clear();
+            bytes = 0;
+
+            sender.send(buf);
+            sender.flush();
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java Wed Jul  9 06:26:54 2008
@@ -55,6 +55,11 @@
         write(buf);
     }
 
+    public void flush()
+    {
+        // pass
+    }
+
     /* The extra copying sucks.
      * If I know for sure that the buf is backed
      * by an array then I could do buf.array()

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java Wed Jul  9 06:26:54 2008
@@ -58,6 +58,11 @@
         }
     }
 
+    public void flush()
+    {
+        // pass
+    }
+
     public synchronized void close()
     {
         // MINA will sometimes throw away in-progress writes when you

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java Wed Jul  9 06:26:54 2008
@@ -47,6 +47,11 @@
         }
     }
 
+    public void flush()
+    {
+        // pass
+    }
+
     private void write(java.nio.ByteBuffer buf)
     {
         synchronized (lock)