You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2008/09/12 09:34:45 UTC

svn commit: r694628 - in /incubator/qpid/trunk/qpid/dotnet/client-010: client/transport/network/ client/transport/network/io/ test/interop/

Author: arnaudsimon
Date: Fri Sep 12 00:34:44 2008
New Revision: 694628

URL: http://svn.apache.org/viewvc?rev=694628&view=rev
Log:
qpid-1277: fixed large messages issue; added more tests

Added:
    incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/IIoSender.cs
    incubator/qpid/trunk/qpid/dotnet/client-010/test/interop/Message.cs
Modified:
    incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/Assembler.cs
    incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/Disassembler.cs
    incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/InputHandler.cs
    incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs
    incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs
    incubator/qpid/trunk/qpid/dotnet/client-010/test/interop/TestCase.cs

Modified: incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/Assembler.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/Assembler.cs?rev=694628&r1=694627&r2=694628&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/Assembler.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/Assembler.cs Fri Sep 12 00:34:44 2008
@@ -35,7 +35,7 @@
     public class Assembler : NetworkDelegate, Receiver<ReceivedPayload<ProtocolEvent>>
     {
         private static readonly Logger log = Logger.get(typeof (Assembler));
-        private readonly Dictionary<int, List<Frame>> segments;
+        private readonly Dictionary<int, List<byte[]>> segments;
         private readonly Method[] incomplete;
         [ThreadStatic] static MSDecoder _decoder;
         private readonly Object m_objectLock = new object();
@@ -101,7 +101,7 @@
 
         public Assembler()
         {
-            segments = new Dictionary<int, List<Frame>>();
+            segments = new Dictionary<int, List<byte[]>>();
             incomplete = new Method[64*1024];
         }
 
@@ -133,27 +133,28 @@
             }
             else
             {
-                List<Frame> frames;
+                List<byte[]> frames;
                 if (frame.isFirstFrame())
                 {
-                    frames = new List<Frame>();
+                    frames = new List<byte[]>();
                     setSegment(frame, frames);
                 }
                 else
                 {
                     frames = getSegment(frame);
                 }
-
-                frames.Add(frame);
+                byte[] tmp = new byte[frame.BodySize];
+                frame.Body.Read(tmp, 0, tmp.Length);
+                frames.Add(tmp);
 
                 if (frame.isLastFrame())
                 {
                     clearSegment(frame);
                     segment = new MemoryStream();
                     BinaryWriter w = new BinaryWriter(segment);
-                    foreach (Frame f in frames)
+                    foreach (byte[] f in frames)
                     {
-                        w.Write(f.Body.ToArray());
+                        w.Write(f);
                     }
                     assemble(frame, segment);
                 }
@@ -219,13 +220,9 @@
                     }
                     break;
                 case SegmentType.BODY:
-                    command = incomplete[channel];
-                    byte[] b = new byte[frame.BodySize];
-                    MemoryStream body = new MemoryStream();
-                    segment.Read(b, 0, b.Length);
-                    body.Write(b, 0, b.Length);
-                    body.Seek(0, SeekOrigin.Begin);
-                    command.Body = body;
+                    command = incomplete[channel];                  
+                    segment.Seek(0, SeekOrigin.Begin);
+                    command.Body = segment;
                     incomplete[channel] = null;
                     Emit(channel, command);
                     break;
@@ -239,12 +236,12 @@
             return (frame.Track + 1)*frame.Channel;
         }
 
-        private List<Frame> getSegment(Frame frame)
+        private List<byte[]> getSegment(Frame frame)
         {
             return segments[segmentKey(frame)];
         }
 
-        private void setSegment(Frame frame, List<Frame> segment)
+        private void setSegment(Frame frame, List<byte[]> segment)
         {
             int key = segmentKey(frame);
             if (segments.ContainsKey(key))

Modified: incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/Disassembler.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/Disassembler.cs?rev=694628&r1=694627&r2=694628&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/Disassembler.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/Disassembler.cs Fri Sep 12 00:34:44 2008
@@ -30,7 +30,7 @@
     /// </summary>
     public sealed class Disassembler : Sender<ProtocolEvent>, ProtocolDelegate<Object>
     {
-        private readonly Sender<MemoryStream> _sender;
+        private readonly IIOSender<MemoryStream> _sender;
         private readonly int _maxPayload;
         private readonly MemoryStream _header;
         private readonly BinaryWriter _writer;
@@ -38,7 +38,7 @@
         [ThreadStatic] static MSEncoder _encoder;
 
 
-        public Disassembler(Sender<MemoryStream> sender, int maxFrame)
+        public Disassembler(IIOSender<MemoryStream> sender, int maxFrame)
         {
             if (maxFrame <= Frame.HEADER_SIZE || maxFrame >= 64*1024)
             {
@@ -120,8 +120,8 @@
                 _writer.Write((byte)0);
                _writer.Write((byte)0);
                 _sender.send(_header);
-                _header.Seek(0, SeekOrigin.Begin);
-                _sender.send(buf);
+                _header.Seek(0, SeekOrigin.Begin);               
+                _sender.send(buf, size);
             }
         }
 
@@ -129,13 +129,13 @@
         {
             byte typeb = (byte) type;
             byte track = mevent.EncodedTrack == Frame.L4 ? (byte) 1 : (byte) 0;
-
             int remaining = (int) buf.Length;
+            buf.Seek(0, SeekOrigin.Begin);
             bool first = true;
             while (true)
             {
                 int size = Math.Min(_maxPayload, remaining);
-                remaining -= size;
+                remaining -= size;              
 
                 byte newflags = flags;
                 if (first)
@@ -146,7 +146,7 @@
                 if (remaining == 0)
                 {
                     newflags |= Frame.LAST_FRAME;
-                }
+                }                
 
                 frame(newflags, typeb, track, mevent.Channel, size, buf);
 

Added: incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/IIoSender.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/IIoSender.cs?rev=694628&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/IIoSender.cs (added)
+++ incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/IIoSender.cs Fri Sep 12 00:34:44 2008
@@ -0,0 +1,28 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+namespace org.apache.qpid.transport.network
+{
+    public interface IIOSender<T>:Sender<T>
+    {
+        void send(T body, int siz);
+    }
+}

Modified: incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/InputHandler.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/InputHandler.cs?rev=694628&r1=694627&r2=694628&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/InputHandler.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/InputHandler.cs Fri Sep 12 00:34:44 2008
@@ -127,7 +127,11 @@
         public void On_ReceivedBuffer(object sender, ReceivedPayload<MemoryStream> payload)
         {
             MemoryStream buf = payload.Payload;
-            int remaining = (int)buf.Length;
+            int remaining = (int) buf.Length;
+            if( input != null )
+            {
+                remaining += (int) input.Length;
+            }
             try
             {
                 while (remaining > 0)
@@ -136,10 +140,9 @@
                     {                        
                         if (input != null)
                         {
-                            remaining += (int) input.Length;
-                            byte[] tmp = new byte[remaining];
-                            buf.Read(tmp, 0, remaining);
-                            input.Write(tmp, 0, remaining);
+                            byte[] tmp = new byte[buf.Length];
+                            buf.Read(tmp, 0, tmp.Length);
+                            input.Write(tmp, 0, tmp.Length);
                             input.Seek(0, SeekOrigin.Begin);
                             buf = input;    
                         }                      
@@ -155,13 +158,19 @@
                     }
                     else
                     {
+                        byte[] tmp;
                         if (input == null)
                         {
                             input = new MemoryStream();
+                            tmp = new byte[remaining];                            
+                        }
+                        else
+                        {
+                            // this is a full buffer 
+                            tmp = new byte[buf.Length];
                         }
-                        byte[] tmp = new byte[remaining];
-                        buf.Read(tmp, 0, remaining);
-                        input.Write(tmp, 0, remaining);                       
+                        buf.Read(tmp, 0, tmp.Length);
+                        input.Write(tmp, 0, tmp.Length);
                         remaining = 0;
                     }
                 }

Modified: incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs?rev=694628&r1=694627&r2=694628&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs Fri Sep 12 00:34:44 2008
@@ -21,7 +21,6 @@
 using System;
 using System.IO;
 using System.Threading;
-using TransportException = org.apache.qpid.transport.TransportException;
 using Logger = org.apache.qpid.transport.util.Logger;
 
 

Modified: incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs?rev=694628&r1=694627&r2=694628&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs Fri Sep 12 00:34:44 2008
@@ -17,7 +17,6 @@
 * under the License.
 */
 using System;
-using System.Collections.Generic;
 using System.IO;
 using System.Net.Sockets;
 using System.Threading;
@@ -26,7 +25,7 @@
 
 namespace org.apache.qpid.transport.network.io
 {
-    public sealed class IoSender : Sender<MemoryStream>
+    public sealed class IoSender : IIOSender<MemoryStream>
     {
         private static readonly Logger log = Logger.get(typeof (IoReceiver));
         private readonly NetworkStream bufStream;
@@ -35,12 +34,12 @@
         private readonly CircularBuffer<byte[]> queue;
         private readonly Thread thread;
         private readonly int timeout;
-        private MemoryStream _tobeSent = new MemoryStream();
+        private readonly MemoryStream _tobeSent = new MemoryStream();
         public IoSender(IoTransport transport, int queueSize, int timeout)
         {
             this.timeout = timeout;
             bufStream = transport.Stream;
-            queue = new CircularBuffer<byte[]>(2);
+            queue = new CircularBuffer<byte[]>(queueSize);
             thread = new Thread(Go);
             log.debug("Creating IoSender thread");
             thread.Name = String.Format("IoSender - {0}", transport.Socket) ;
@@ -48,20 +47,24 @@
             thread.Start();
         }
 
-
         public void send(MemoryStream str)
         {
+            int pos = (int) str.Position;
+            str.Seek(0, SeekOrigin.Begin);
+            send(str, pos);
+        }
+
+        public void send(MemoryStream str, int size)
+        {
             mutClosed.WaitOne();
             if (closed)
             {
                 throw new TransportException("sender is closed");
             }
-            mutClosed.ReleaseMutex();
-            int length = (int)str.Position;
-            str.Seek(0, SeekOrigin.Begin);           
-            byte[] buf = new byte[length];
-            str.Read(buf, 0, length);
-            _tobeSent.Write(buf, 0, length);          
+            mutClosed.ReleaseMutex();                    
+            byte[] buf = new byte[size];
+            str.Read(buf, 0, size);
+            _tobeSent.Write(buf, 0, size);          
         }
 
         public void flush()

Added: incubator/qpid/trunk/qpid/dotnet/client-010/test/interop/Message.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/client-010/test/interop/Message.cs?rev=694628&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/client-010/test/interop/Message.cs (added)
+++ incubator/qpid/trunk/qpid/dotnet/client-010/test/interop/Message.cs Fri Sep 12 00:34:44 2008
@@ -0,0 +1,181 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+using System;
+using System.IO;
+using System.Text;
+using System.Threading;
+using client.client;
+using NUnit.Framework;
+using org.apache.qpid.client;
+using org.apache.qpid.transport;
+using org.apache.qpid.transport.util;
+
+namespace test.interop
+{
+    public class Message : TestCase
+    {
+        private static readonly Logger _log = Logger.get(typeof (Message));
+
+        [Test]
+        public void sendAndPurge()
+        {
+            _log.debug("Running: exchangeBind");
+            ClientSession ssn = Client.createSession(0);
+            ssn.queueDelete("queue1");
+            QueueQueryResult result = (QueueQueryResult) ssn.queueQuery("queue1").Result;
+            Assert.IsNull(result.getQueue());
+            ssn.queueDeclare("queue1", null, null);
+            ssn.exchangeBind("queue1", "amq.direct", "queue1", null);
+            for (int i = 0; i < 10; i++)
+            {
+                ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+                                    new Header(new DeliveryProperties().setRoutingKey("queue1"),
+                                               new MessageProperties().setMessageId(UUID.randomUUID())),
+                                    Encoding.UTF8.GetBytes("test: " + i));
+            }
+            ssn.sync();
+            result = (QueueQueryResult) ssn.queueQuery("queue1").Result;
+            Assert.IsTrue(result.getMessageCount() == 10);
+            ssn.queuePurge("queue1");
+            ssn.sync();
+            result = (QueueQueryResult) ssn.queueQuery("queue1").Result;
+            Assert.IsTrue(result.getMessageCount() == 0);
+        }
+
+        [Test]
+        public void sendAndReceiveSmallMessages()
+        {
+            _log.debug("Running: sendAndReceiveSmallMessages");
+            byte[] smallMessage = Encoding.UTF8.GetBytes("test");
+            sendAndReceive(smallMessage, 100);
+        }
+
+        [Test]
+        public void sendAndReceiveLargeMessages()
+        {
+            _log.debug("Running: sendAndReceiveSmallMessages");
+            byte[] largeMessage = new byte[100 * 1024];
+            Random random = new Random();
+            random.NextBytes(largeMessage);
+            sendAndReceive(largeMessage, 10);
+        }
+
+        [Test]
+        public void sendAndReceiveVeryLargeMessages()
+        {
+            _log.debug("Running: sendAndReceiveSmallMessages");
+            byte[] verylargeMessage = new byte[1000 * 1024];
+            Random random = new Random();
+            random.NextBytes(verylargeMessage);
+            sendAndReceive(verylargeMessage, 2);
+        }
+
+        private void sendAndReceive(byte[] messageBody, int count)
+        {           
+            ClientSession ssn = Client.createSession(0);
+            ssn.sync();
+            ssn.queueDeclare("queue1", null, null);
+            ssn.queueDelete("queue1");           
+            QueueQueryResult result = (QueueQueryResult) ssn.queueQuery("queue1").Result;            
+            Assert.IsNull(result.getQueue());
+            ssn.queueDeclare("queue1", null, null);
+            ssn.exchangeBind("queue1", "amq.direct", "queue1", null);
+            Object myLock = new Object();
+            MyListener myListener = new MyListener(myLock, count);
+            ssn.attachMessageListener(myListener, "myDest");
+
+            ssn.messageSubscribe("queue1", "myDest", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, null,
+                                 0, null);
+
+
+            // issue credits     
+            ssn.messageSetFlowMode("myDest", MessageFlowMode.WINDOW);
+            ssn.messageFlow("myDest", MessageCreditUnit.BYTE, ClientSession.MESSAGE_FLOW_MAX_BYTES);
+            ssn.messageFlow("myDest", MessageCreditUnit.MESSAGE, 10000);
+            ssn.sync();
+
+            for (int i = 0; i < count; i++)
+            {
+                ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+                                    new Header(new DeliveryProperties().setRoutingKey("queue1"),
+                                               new MessageProperties().setMessageId(UUID.randomUUID())),
+                                    messageBody);
+            }
+            ssn.sync();
+           
+            lock (myLock)
+            {
+                if (myListener.Count != 0)
+                {
+                    Monitor.Wait(myLock, 10000000);
+                }
+            }
+            Assert.IsTrue(myListener.Count == 0);
+            ssn.messageAccept(myListener.UnAck);
+            ssn.sync();
+            // the queue should be empty 
+            result = (QueueQueryResult)ssn.queueQuery("queue1").Result;
+            Assert.IsTrue(result.getMessageCount() == 0);
+            ssn.close();        
+        }
+
+
+
+        private class MyListener : MessageListener
+        {
+            private static readonly Logger _log = Logger.get(typeof (MyListener));
+            private readonly Object _wl;
+            private int _count;
+            private RangeSet _rs = new RangeSet();
+
+            public MyListener(Object wl, int count)
+            {
+                _wl = wl;
+                _count = count;
+            }
+
+            public void messageTransfer(MessageTransfer m)
+            {
+                byte[] body = new byte[m.Body.Length - m.Body.Position];                               
+                _log.debug("Got a message of size: " + body.Length + " count = " + _count);
+                _rs.add(m.Id);
+                lock (_wl)
+                {
+                    _count--;
+                    if (_count == 0)
+                    {
+                        Monitor.PulseAll(_wl);
+                    }
+                }
+            }
+
+            public int Count
+            {
+                get { return _count; }
+            }
+
+            public RangeSet UnAck
+            {
+                get { return _rs; }
+            }
+        }
+    }
+}

Modified: incubator/qpid/trunk/qpid/dotnet/client-010/test/interop/TestCase.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/client-010/test/interop/TestCase.cs?rev=694628&r1=694627&r2=694628&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/client-010/test/interop/TestCase.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/client-010/test/interop/TestCase.cs Fri Sep 12 00:34:44 2008
@@ -43,18 +43,16 @@
             // populate default properties
             _properties.Add("UserName", "guest");
             _properties.Add("Password", "guest");
-            _properties.Add("Host", "192.168.1.14");
-            _properties.Add("Port", "5673");
+            _properties.Add("Host", "localhost");
+            _properties.Add("Port", "5672");
             _properties.Add("VirtualHost", "test");
              //Read the test config file  
             XmlTextReader reader = new XmlTextReader(Environment.CurrentDirectory + ".\\test.config");
             while (reader.Read())
-            {
-                XmlNodeType nType = reader.NodeType;               
+            {                
                 // if node type is an element
                 if (reader.NodeType == XmlNodeType.Element && reader.Name.Equals("add"))
                 {
-                    Console.WriteLine("Element:" + reader.Name.ToString());
                     if (_properties.ContainsKey(reader.GetAttribute("key")))
                     {
                         _properties[reader.GetAttribute("key")] = reader.GetAttribute("value");