You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/01/11 19:59:06 UTC
svn commit: r368086 - in /incubator/activemq/trunk/openwire-dotnet:
src/OpenWire.Client/ src/OpenWire.Client/Core/ tests/OpenWire.Client/
Author: jstrachan
Date: Wed Jan 11 10:58:57 2006
New Revision: 368086
URL: http://svn.apache.org/viewcvs?rev=368086&view=rev
Log:
added a rough implementation of the ITransport interface; needs more work on the concurrent processing side of things though
Added:
incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs (with props)
Modified:
incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs
incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs
incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Command.cs
incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs
incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/ITransport.cs
incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs
incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/OpenWire.Client.build
Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs?rev=368086&r1=368085&r2=368086&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs Wed Jan 11 10:58:57 2006
@@ -47,6 +47,7 @@
session.Dispose();
}
sessions.Clear();
+ transport.Dispose();
closed = true;
}
Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs?rev=368086&r1=368085&r2=368086&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs Wed Jan 11 10:58:57 2006
@@ -78,8 +78,7 @@
}
protected ITransport CreateITransport() {
- // TODO
- return null;
+ return new SocketTransport(host, port);
}
}
}
Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Command.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Command.cs?rev=368086&r1=368085&r2=368086&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Command.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Command.cs Wed Jan 11 10:58:57 2006
@@ -6,5 +6,17 @@
/// An OpenWire command
/// </summary>
public interface Command : DataStructure {
+
+ /* TODO
+ short CommandId {
+ get;
+ set;
+ }
+
+ bool ResponseRequired {
+ get;
+ set;
+ }
+ */
}
}
Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs?rev=368086&r1=368085&r2=368086&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs Wed Jan 11 10:58:57 2006
@@ -32,12 +32,18 @@
}
public Response Response {
- get { return response; }
+ get {
+ // TODO use the proper .Net version of notify/wait()
+ while (response == null) {
+ Thread.Sleep(100);
+ }
+ return response;
+ }
set {
asyncWaitHandle.WaitOne();
response = value;
isCompleted = true;
- asyncWaitHandle.ReleaseMutex();
+ asyncWaitHandle.ReleaseMutex();
}
}
}
Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/ITransport.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/ITransport.cs?rev=368086&r1=368085&r2=368086&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/ITransport.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/ITransport.cs Wed Jan 11 10:58:57 2006
@@ -12,7 +12,7 @@
/// <summary>
/// Represents the logical networking transport layer.
/// </summary>
- public interface ITransport {
+ public interface ITransport : IDisposable {
void Oneway(Command command);
FutureResponse AsyncRequest(Command command);
Added: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs?rev=368086&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs (added)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs Wed Jan 11 10:58:57 2006
@@ -0,0 +1,144 @@
+using System;
+using System.Collections;
+using System.IO;
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading;
+
+
+using OpenWire.Client;
+using OpenWire.Client.Commands;
+using OpenWire.Client.Core;
+using OpenWire.Client.IO;
+
+namespace OpenWire.Client.Core {
+
+ /// <summary>
+ /// An implementation of ITransport that uses sockets to communicate with the broker
+ /// </summary>
+ public class SocketTransport : ITransport {
+ private readonly object transmissionLock = new object();
+ private readonly Socket socket;
+ private readonly BinaryReader socketReader;
+ private readonly BinaryWriter socketWriter;
+ private readonly Thread readThread;
+ private bool closed;
+ private IDictionary requestMap = new Hashtable(); // TODO threadsafe
+ private short nextCommandId;
+
+ public event CommandHandler Command;
+ public event ExceptionHandler Exception;
+
+ public SocketTransport(string host, int port) {
+ Console.WriteLine("Opening socket to: " + host + " on port: " + port);
+ socket = Connect(host, port);
+ socketWriter = new BinaryWriter(new NetworkStream(socket));
+ socketReader = new BinaryReader(new NetworkStream(socket));
+
+ // now lets create the background read thread
+ readThread = new Thread(new ThreadStart(ReadLoop));
+ readThread.Start();
+ }
+
+ public void Oneway(Command command) {
+ BaseCommand baseCommand = (BaseCommand) command;
+ baseCommand.CommandId = GetNextCommandId();
+ baseCommand.ResponseRequired = false;
+ Send(command);
+ }
+
+ public FutureResponse AsyncRequest(Command command) {
+ BaseCommand baseCommand = (BaseCommand) command;
+ baseCommand.CommandId = GetNextCommandId();
+ baseCommand.ResponseRequired = true;
+ Send(command);
+ FutureResponse future = new FutureResponse();
+ requestMap[baseCommand.CommandId] = future;
+ return future;
+ }
+
+ public Response Request(Command command) {
+ FutureResponse response = AsyncRequest(command);
+ return response.Response;
+ }
+
+ public void Dispose() {
+ Console.WriteLine("Closing the socket");
+ lock (transmissionLock) {
+ socket.Close();
+ closed = true;
+ }
+ socketWriter.Close();
+ socketReader.Close();
+ }
+
+ public void ReadLoop() {
+ Console.WriteLine("Starting to read commands from ActiveMQ");
+ while (!closed) {
+ BaseCommand command = null;
+ try {
+ command = (BaseCommand) CommandMarshallerRegistry.ReadCommand(socketReader);
+ } catch (ObjectDisposedException e) {
+ // stream closed
+ break;
+ }
+ if (command is Response) {
+ Console.WriteLine("Received response!: " + command);
+ Response response = (Response) command;
+ FutureResponse future = (FutureResponse) requestMap[response.CommandId];
+ if (future != null) {
+ if (response is ExceptionResponse) {
+ ExceptionResponse er = (ExceptionResponse) response;
+ if (this.Exception != null) {
+ Exception e = new BrokerException(er.Exception);
+ this.Exception(this, e);
+ }
+ } else {
+ future.Response = response;
+ }
+ } else {
+ Console.WriteLine("Unknown response ID: " + response.CommandId);
+ }
+ } else {
+ if (this.Command != null) {
+ this.Command(this, command);
+ }
+ }
+ }
+ }
+
+
+ // Implementation methods
+
+ protected void Send(Command command) {
+ lock (transmissionLock) {
+ CommandMarshallerRegistry.WriteCommand(command, socketWriter);
+ socketWriter.Flush();
+ }
+ }
+
+ protected short GetNextCommandId() {
+ lock (transmissionLock) {
+ return++nextCommandId;
+ }
+ }
+
+ protected Socket Connect(string host, int port) {
+ // Looping through the AddressList allows different type of connections to be tried
+ // (IPv4, IPv6 and whatever else may be available).
+ IPHostEntry hostEntry = Dns.Resolve(host);
+ foreach (IPAddress address in hostEntry.AddressList) {
+ Socket socket = new Socket(
+ address.AddressFamily,
+ SocketType.Stream,
+ ProtocolType.Tcp);
+ socket.Connect(new IPEndPoint(address, port));
+ if (socket.Connected) {
+ return socket;
+ }
+ }
+ throw new SocketException();
+ }
+ }
+}
Propchange: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs
------------------------------------------------------------------------------
svn:executable = *
Modified: incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs?rev=368086&r1=368085&r2=368086&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs Wed Jan 11 10:58:57 2006
@@ -17,10 +17,15 @@
Assert.IsTrue(factory != null, "created valid factory: " + factory);
Console.WriteLine("Worked!");
- /*
+
using (IConnection connection = factory.CreateConnection()) {
ISession session = connection.CreateSession();
+ Console.WriteLine("Created a session: " + session);
+
IDestination destination = session.GetQueue("FOO.BAR");
+ Assert.IsTrue(destination != null, "No queue available!");
+ Console.WriteLine("Using destination: " + destination);
+
IMessageConsumer consumer = session.CreateConsumer(destination);
IMessageProducer producer = session.CreateProducer(destination);
@@ -32,7 +37,6 @@
Assert.AreEqual(expected, message.Text);
}
- */
}
}
}
Modified: incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/OpenWire.Client.build
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/OpenWire.Client.build?rev=368086&r1=368085&r2=368086&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/OpenWire.Client.build (original)
+++ incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/OpenWire.Client.build Wed Jan 11 10:58:57 2006
@@ -29,8 +29,6 @@
<include name="log4net.dll" />
<include name="${build.dir}/bin/${project.name}.dll" />
</references>
-
-
<resources failonempty="false" basedir="Resources"
dynamicprefix="true" prefix="XML:">
<include name="**/*.xml" />