You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/01/11 19:59:06 UTC

svn commit: r368086 - in /incubator/activemq/trunk/openwire-dotnet: src/OpenWire.Client/ src/OpenWire.Client/Core/ tests/OpenWire.Client/

Author: jstrachan
Date: Wed Jan 11 10:58:57 2006
New Revision: 368086

URL: http://svn.apache.org/viewcvs?rev=368086&view=rev
Log:
added a rough implementation of the ITransport interface; needs more work on the concurrent processing side of things though

Added:
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs   (with props)
Modified:
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Command.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/ITransport.cs
    incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs
    incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/OpenWire.Client.build

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs?rev=368086&r1=368085&r2=368086&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs Wed Jan 11 10:58:57 2006
@@ -47,6 +47,7 @@
                                 session.Dispose(); 
                         }
                         sessions.Clear();
+                        transport.Dispose();
                         closed = true; 
                 }
 

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs?rev=368086&r1=368085&r2=368086&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs Wed Jan 11 10:58:57 2006
@@ -78,8 +78,7 @@
                 }
 
                 protected ITransport CreateITransport() {
-                        // TODO
-                        return null; 
+                        return new SocketTransport(host, port); 
                 } 
         } 
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Command.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Command.cs?rev=368086&r1=368085&r2=368086&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Command.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Command.cs Wed Jan 11 10:58:57 2006
@@ -6,5 +6,17 @@
         /// An OpenWire command
         /// </summary>
         public interface Command : DataStructure {
+                
+                /* TODO
+                short CommandId {
+                        get;
+                        set; 
+                }
+
+                bool ResponseRequired {
+                        get;
+                        set;
+                } 
+                */
         } 
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs?rev=368086&r1=368085&r2=368086&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs Wed Jan 11 10:58:57 2006
@@ -32,12 +32,18 @@
                 }
 
                 public Response Response {
-                        get { return response; }
+                        get {
+                                // TODO use the proper .Net version of notify/wait()
+                                while (response == null) {
+                                        Thread.Sleep(100);
+                                }
+                                return response;
+                        }
                         set {
                                 asyncWaitHandle.WaitOne();
                                 response = value;
                                 isCompleted = true;
-                                asyncWaitHandle.ReleaseMutex();
+                                asyncWaitHandle.ReleaseMutex(); 
                         }
                 } 
         } 

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/ITransport.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/ITransport.cs?rev=368086&r1=368085&r2=368086&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/ITransport.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/ITransport.cs Wed Jan 11 10:58:57 2006
@@ -12,7 +12,7 @@
         /// <summary>
         /// Represents the logical networking transport layer.
         /// </summary>
-        public interface ITransport {
+        public interface ITransport : IDisposable {
                 void Oneway(Command command);
 
                 FutureResponse AsyncRequest(Command command);

Added: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs?rev=368086&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs (added)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs Wed Jan 11 10:58:57 2006
@@ -0,0 +1,144 @@
+using System;
+using System.Collections;
+using System.IO;
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading;
+
+
+using OpenWire.Client;
+using OpenWire.Client.Commands;
+using OpenWire.Client.Core;
+using OpenWire.Client.IO;
+
+namespace OpenWire.Client.Core {
+
+        /// <summary>
+        /// An implementation of ITransport that uses sockets to communicate with the broker
+        /// </summary>
+        public class SocketTransport : ITransport {
+                private readonly object transmissionLock = new object();
+                private readonly Socket socket;
+                private readonly BinaryReader socketReader;
+                private readonly BinaryWriter socketWriter;
+                private readonly Thread readThread;
+                private bool closed;
+                private IDictionary requestMap = new Hashtable(); // TODO threadsafe
+                private short nextCommandId;
+
+                public event CommandHandler Command;
+                public event ExceptionHandler Exception;
+
+                public SocketTransport(string host, int port) {
+                        Console.WriteLine("Opening socket to: " + host + " on port: " + port);
+                        socket = Connect(host, port);
+                        socketWriter = new BinaryWriter(new NetworkStream(socket));
+                        socketReader = new BinaryReader(new NetworkStream(socket));
+
+                        // now lets create the background read thread
+                        readThread = new Thread(new ThreadStart(ReadLoop));
+                        readThread.Start(); 
+                }
+
+                public void Oneway(Command command) {
+                        BaseCommand baseCommand = (BaseCommand) command;
+                        baseCommand.CommandId = GetNextCommandId();
+                        baseCommand.ResponseRequired = false;
+                        Send(command); 
+                }
+
+                public FutureResponse AsyncRequest(Command command) {
+                        BaseCommand baseCommand = (BaseCommand) command;
+                        baseCommand.CommandId = GetNextCommandId();
+                        baseCommand.ResponseRequired = true;
+                        Send(command);
+                        FutureResponse future = new FutureResponse();
+                        requestMap[baseCommand.CommandId] = future;
+                        return future; 
+                }
+
+                public Response Request(Command command) {
+                        FutureResponse response = AsyncRequest(command);
+                        return response.Response; 
+                }
+
+                public void Dispose() {
+                        Console.WriteLine("Closing the socket");
+                        lock (transmissionLock) {
+                                socket.Close();
+                                closed = true; 
+                        }
+                        socketWriter.Close();
+                        socketReader.Close(); 
+                }
+
+                public void ReadLoop() {
+                        Console.WriteLine("Starting to read commands from ActiveMQ");
+                        while (!closed) {
+                                BaseCommand command = null;
+                                try {
+                                        command = (BaseCommand) CommandMarshallerRegistry.ReadCommand(socketReader); 
+                                } catch (ObjectDisposedException e) {
+                                        // stream closed
+                                        break; 
+                                }
+                                if (command is Response) {
+                                        Console.WriteLine("Received response!: " + command);
+                                        Response response = (Response) command;
+                                        FutureResponse future = (FutureResponse) requestMap[response.CommandId];
+                                        if (future != null) {
+                                                if (response is ExceptionResponse) {
+                                                        ExceptionResponse er = (ExceptionResponse) response;
+                                                        if (this.Exception != null) {
+                                                                Exception e = new BrokerException(er.Exception);
+                                                                this.Exception(this, e); 
+                                                        } 
+                                                } else {
+                                                        future.Response = response; 
+                                                } 
+                                        } else {
+                                                Console.WriteLine("Unknown response ID: " + response.CommandId); 
+                                        } 
+                                } else {
+                                        if (this.Command != null) {
+                                                this.Command(this, command); 
+                                        } 
+                                } 
+                        } 
+                }
+
+
+                // Implementation methods
+
+                protected void Send(Command command) {
+                        lock (transmissionLock) {
+                                CommandMarshallerRegistry.WriteCommand(command, socketWriter);
+                                socketWriter.Flush(); 
+                        } 
+                }
+
+                protected short GetNextCommandId() {
+                        lock (transmissionLock) {
+                                return++nextCommandId; 
+                        } 
+                }
+
+                protected Socket Connect(string host, int port) {
+                        // Looping through the AddressList allows different type of connections to be tried 
+                        // (IPv4, IPv6 and whatever else may be available).
+                        IPHostEntry hostEntry = Dns.Resolve(host);
+                        foreach (IPAddress address in hostEntry.AddressList) {
+                                Socket socket = new Socket(
+                                        address.AddressFamily,
+                                        SocketType.Stream,
+                                        ProtocolType.Tcp);
+                                socket.Connect(new IPEndPoint(address, port));
+                                if (socket.Connected) {
+                                        return socket; 
+                                } 
+                        }
+                        throw new SocketException(); 
+                } 
+        } 
+}

Propchange: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs?rev=368086&r1=368085&r2=368086&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs Wed Jan 11 10:58:57 2006
@@ -17,10 +17,15 @@
                         Assert.IsTrue(factory != null, "created valid factory: " + factory);
                         
                         Console.WriteLine("Worked!");
-                        /*
+                        
                         using (IConnection connection = factory.CreateConnection()) {
                                 ISession session = connection.CreateSession();
+                                Console.WriteLine("Created a session: " + session);
+                                
                                 IDestination destination = session.GetQueue("FOO.BAR");
+                                Assert.IsTrue(destination != null, "No queue available!");
+                                Console.WriteLine("Using destination: " + destination);
+
                                 IMessageConsumer consumer = session.CreateConsumer(destination);
                                 
                                 IMessageProducer producer = session.CreateProducer(destination);
@@ -32,7 +37,6 @@
                                 
                                 Assert.AreEqual(expected, message.Text); 
                         } 
-                        */
                 } 
         } 
 }

Modified: incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/OpenWire.Client.build
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/OpenWire.Client.build?rev=368086&r1=368085&r2=368086&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/OpenWire.Client.build (original)
+++ incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/OpenWire.Client.build Wed Jan 11 10:58:57 2006
@@ -29,8 +29,6 @@
         <include name="log4net.dll" />
         <include name="${build.dir}/bin/${project.name}.dll" />
       </references>
-
-
       <resources failonempty="false" basedir="Resources"
         dynamicprefix="true" prefix="XML:">
         <include name="**/*.xml" />