You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/09/03 01:38:07 UTC
svn commit: r810734 [3/5] - in /qpid/trunk/qpid/wcf: ./ samples/
samples/Channel/ samples/Channel/WCFToWCFDirect/
samples/Channel/WCFToWCFDirect/Client/
samples/Channel/WCFToWCFDirect/Client/Properties/
samples/Channel/WCFToWCFDirect/Service/ samples/C...
Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs Wed Sep 2 23:38:03 2009
@@ -0,0 +1,592 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+// TODO: flow control
+// timeout handling
+// transactions
+// check if should split into separate input and output classes (little overlap)
+
+namespace Apache.Qpid.Channel
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Generic;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.Text;
+ using System.Threading;
+ using System.Globalization;
+ using System.Xml;
+
+ // the thin interop layer that provides access to the Qpid AMQP client libraries
+ using Apache.Qpid.Interop;
+ using Apache.Qpid.AmqpTypes;
+
+ /// <summary>
+ /// WCF client transport channel for accessing AMQP brokers using the Qpid C++ library
+ /// </summary>
+ public class AmqpTransportChannel : ChannelBase, IOutputChannel, IInputChannel
+ {
+ private static readonly EndpointAddress AnonymousAddress =
+ new EndpointAddress("http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous");
+
+ private EndpointAddress remoteAddress;
+ private MessageEncoder encoder;
+ private AmqpChannelProperties factoryChannelProperties;
+ private bool shared;
+ private string encoderContentType;
+
+ // input = 0-10 queue, output = 0-10 exchange
+ private string queueName;
+
+ private String routingKey;
+ private BufferManager bufferManager;
+ private AmqpProperties outputMessageProperties;
+
+ private InputLink inputLink;
+ private OutputLink outputLink;
+
+ private bool isInputChannel;
+ private bool streamed;
+
+ private AsyncTimeSpanCaller asyncOpenCaller;
+ private AsyncTimeSpanCaller asyncCloseCaller;
+
+ internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection)
+ : base(factory)
+ {
+ this.isInputChannel = (factory is ChannelListenerBase) || (factory is AmqpChannelFactory<IInputChannel>);
+
+ if (remoteAddress == null)
+ {
+ throw new ArgumentException("Null Endpoint Address");
+ }
+
+ this.factoryChannelProperties = channelProperties;
+ this.shared = sharedConnection;
+ this.remoteAddress = remoteAddress;
+
+ // pull out host, port, queue, and connection arguments
+ this.ParseAmqpUri(remoteAddress.Uri);
+
+ this.encoder = msgEncoder;
+ string ct = String.Empty;
+ if (this.encoder != null)
+ {
+ ct = this.encoder.ContentType;
+ if (ct != null)
+ {
+ int pos = ct.IndexOf(';');
+ if (pos != -1)
+ {
+ ct = ct.Substring(0, pos).Trim();
+ }
+ }
+ else
+ {
+ ct = "application/octet-stream";
+ }
+ }
+
+ this.encoderContentType = ct;
+
+ if (this.factoryChannelProperties.TransferMode == TransferMode.Streamed)
+ {
+ this.streamed = true;
+ }
+ else
+ {
+ if (!(this.factoryChannelProperties.TransferMode == TransferMode.Buffered))
+ {
+ throw new ArgumentException("TransferMode mode must be \"Streamed\" or \"Buffered\"");
+ }
+
+ this.streamed = false;
+ }
+
+ this.bufferManager = BufferManager.CreateBufferManager(maxBufferPoolSize, int.MaxValue);
+
+ this.asyncOpenCaller = new AsyncTimeSpanCaller(this.OnOpen);
+ this.asyncCloseCaller = new AsyncTimeSpanCaller(this.OnClose);
+
+ if (this.isInputChannel)
+ {
+ this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, this.queueName);
+ }
+ else
+ {
+ this.outputLink = ConnectionManager.GetOutputLink(this.factoryChannelProperties, shared, false, this.queueName);
+ }
+ }
+
+ private delegate bool AsyncTryReceiveCaller(TimeSpan timeout, out Message message);
+
+ private delegate void AsyncTimeSpanCaller(TimeSpan timeout);
+
+ EndpointAddress IOutputChannel.RemoteAddress
+ {
+ get
+ {
+ return this.remoteAddress;
+ }
+ }
+
+ // i.e what you would insert into a ReplyTo header to reach
+ // here. Presumably should be exchange/link and routing info,
+ // rather than the actual input queue name.
+ EndpointAddress IInputChannel.LocalAddress
+ {
+ get
+ {
+ // TODO: something better
+ return AnonymousAddress;
+ }
+ }
+
+ AmqpProperties OutputMessageProperties
+ {
+ get
+ {
+ if (this.outputMessageProperties == null)
+ {
+ this.outputMessageProperties = this.factoryChannelProperties.DefaultMessageProperties;
+ if (this.outputMessageProperties == null)
+ {
+ this.outputMessageProperties = new AmqpProperties();
+ }
+ }
+
+ return this.outputMessageProperties;
+ }
+ }
+
+ Uri IOutputChannel.Via
+ {
+ get
+ {
+ return this.remoteAddress.Uri;
+ }
+ }
+
+ public override T GetProperty<T>()
+ {
+ if (typeof(T) == typeof(IInputChannel))
+ {
+ if (this.isInputChannel)
+ {
+ return (T)(object)this;
+ }
+ }
+ else if (typeof(T) == typeof(IOutputChannel))
+ {
+ if (!this.isInputChannel)
+ {
+ return (T)(object)this;
+ }
+ }
+
+ return base.GetProperty<T>();
+ }
+
+ public void Send(Message message, TimeSpan timeout)
+ {
+ this.ThrowIfDisposedOrNotOpen();
+ AmqpChannelHelpers.ValidateTimeout(timeout);
+
+ try
+ {
+ using (AmqpMessage amqpMessage = this.WcfToQpid(message))
+ {
+ this.outputLink.Send(amqpMessage, timeout);
+ }
+ }
+ finally
+ {
+ message.Close();
+ }
+ }
+
+ public void Send(Message message)
+ {
+ this.Send(message, this.DefaultSendTimeout);
+ }
+
+ public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ this.ThrowIfDisposedOrNotOpen();
+ AmqpChannelHelpers.ValidateTimeout(timeout);
+
+ try
+ {
+ using (AmqpMessage amqpMessage = this.WcfToQpid(message))
+ {
+ return this.outputLink.BeginSend(amqpMessage, timeout, callback, state);
+ }
+ }
+ finally
+ {
+ message.Close();
+ }
+ }
+
+ public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state)
+ {
+ return this.BeginSend(message, this.DefaultSendTimeout, callback, state);
+ }
+
+ public void EndSend(IAsyncResult result)
+ {
+ this.outputLink.EndSend(result);
+ }
+
+ public Message Receive(TimeSpan timeout)
+ {
+ Message message;
+ if (this.TryReceive(timeout, out message))
+ {
+ return message;
+ }
+ else
+ {
+ throw new TimeoutException("Receive");
+ }
+ }
+
+ public Message Receive()
+ {
+ return this.Receive(this.DefaultReceiveTimeout);
+ }
+
+ public bool TryReceive(TimeSpan timeout, out Message message)
+ {
+ this.ThrowIfDisposedOrNotOpen();
+ AmqpMessage amqpMessage;
+ message = null;
+
+ if (this.inputLink.TryReceive(timeout, out amqpMessage))
+ {
+ message = this.QpidToWcf(amqpMessage);
+ return true;
+ }
+
+ return false;
+ }
+
+ public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ return this.inputLink.BeginTryReceive(timeout, callback, state);
+ }
+
+ public bool EndTryReceive(IAsyncResult result, out Message message)
+ {
+ AmqpMessage amqpMessage = null;
+ if (!this.inputLink.EndTryReceive(result, out amqpMessage))
+ {
+ message = null;
+ return false;
+ }
+ message = QpidToWcf(amqpMessage);
+ return true;
+ }
+
+ public bool WaitForMessage(TimeSpan timeout)
+ {
+ return this.inputLink.WaitForMessage(timeout);
+ }
+
+ public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ return this.inputLink.BeginTryReceive(timeout, callback, state);
+ }
+
+ public IAsyncResult BeginReceive(AsyncCallback callback, object state)
+ {
+ return this.BeginReceive(this.DefaultReceiveTimeout, callback, state);
+ }
+
+ public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ return this.inputLink.BeginWaitForMessage(timeout, callback, state);
+ }
+
+ public Message EndReceive(IAsyncResult result)
+ {
+ Message message;
+ if (this.EndTryReceive(result, out message))
+ {
+ return message;
+ }
+ else
+ {
+ throw new TimeoutException("EndReceive");
+ }
+ }
+
+ public bool EndWaitForMessage(IAsyncResult result)
+ {
+ return this.inputLink.EndWaitForMessage(result);
+ }
+
+ public void CloseEndPoint()
+ {
+ if (this.inputLink != null)
+ {
+ this.inputLink.Close();
+ }
+ if (this.outputLink != null)
+ {
+ this.outputLink.Close();
+ }
+ }
+
+ /// <summary>
+ /// Open connection to Broker
+ /// </summary>
+ protected override void OnOpen(TimeSpan timeout)
+ {
+ // TODO: move open logic to here from constructor
+ }
+
+ protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ return this.asyncOpenCaller.BeginInvoke(timeout, callback, state);
+ }
+
+ protected override void OnEndOpen(IAsyncResult result)
+ {
+ this.asyncOpenCaller.EndInvoke(result);
+ }
+
+ protected override void OnAbort()
+ {
+ //// TODO: check for network-less qpid teardown or launch special thread
+ this.Cleanup();
+ }
+
+ /// <summary>
+ /// Shutdown gracefully
+ /// </summary>
+ protected override void OnClose(TimeSpan timeout)
+ {
+ this.CloseEndPoint();
+ this.Cleanup();
+ }
+
+ protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ return this.asyncCloseCaller.BeginInvoke(timeout, callback, state);
+ }
+
+ protected override void OnEndClose(IAsyncResult result)
+ {
+ this.asyncCloseCaller.EndInvoke(result);
+ }
+
+ private AmqpMessage WcfToQpid(Message wcfMessage)
+ {
+ object obj;
+ AmqpProperties applicationProperties = null;
+ bool success = false;
+ AmqpMessage amqpMessage = null;
+
+ if (wcfMessage.Properties.TryGetValue("AmqpProperties", out obj))
+ {
+ applicationProperties = obj as AmqpProperties;
+ }
+
+ try
+ {
+ AmqpProperties outgoingProperties = new AmqpProperties();
+
+ // Start with AMQP properties from the binding and the URI
+ if (this.factoryChannelProperties.DefaultMessageProperties != null)
+ {
+ outgoingProperties.MergeFrom(this.factoryChannelProperties.DefaultMessageProperties);
+ }
+
+ if (this.routingKey != null)
+ {
+ outgoingProperties.RoutingKey = this.routingKey;
+ }
+
+ // Add the Properties set by the application on this particular message.
+ // Application properties trump channel properties
+ if (applicationProperties != null)
+ {
+ outgoingProperties.MergeFrom(applicationProperties);
+ }
+
+ amqpMessage = this.outputLink.CreateMessage();
+ amqpMessage.Properties = outgoingProperties;
+
+ // copy the WCF message body to the AMQP message body
+ if (this.streamed)
+ {
+ this.encoder.WriteMessage(wcfMessage, amqpMessage.BodyStream);
+ }
+ else
+ {
+ ArraySegment<byte> encodedBody = this.encoder.WriteMessage(wcfMessage, int.MaxValue, this.bufferManager);
+ try
+ {
+ amqpMessage.BodyStream.Write(encodedBody.Array, encodedBody.Offset, encodedBody.Count);
+ }
+ finally
+ {
+ this.bufferManager.ReturnBuffer(encodedBody.Array);
+ }
+ }
+
+ success = true;
+ }
+ finally
+ {
+ if (!success && (amqpMessage != null))
+ {
+ amqpMessage.Dispose();
+ }
+ }
+ return amqpMessage;
+ }
+
+
+ private Message QpidToWcf(AmqpMessage amqpMessage)
+ {
+ if (amqpMessage == null)
+ {
+ return null;
+ }
+
+ Message wcfMessage = null;
+ byte[] managedBuffer = null;
+
+ try
+ {
+ if (this.streamed)
+ {
+ wcfMessage = this.encoder.ReadMessage(amqpMessage.BodyStream, int.MaxValue);
+ }
+ else
+ {
+ int count = (int)amqpMessage.BodyStream.Length;
+ managedBuffer = this.bufferManager.TakeBuffer(count);
+ int nr = amqpMessage.BodyStream.Read(managedBuffer, 0, count);
+ ArraySegment<byte> bufseg = new ArraySegment<byte>(managedBuffer, 0, count);
+
+ wcfMessage = this.encoder.ReadMessage(bufseg, this.bufferManager);
+
+ // set to null for finally{} block, since the encoder is now responsible for
+ // returning the BufferManager memory
+ managedBuffer = null;
+ }
+
+ // This message will be discarded unless the "To" header matches
+ // the WCF endpoint dispatcher's address filter (or the service is
+ // AddressFilterMode=AddressFilterMode.Any).
+
+ this.remoteAddress.ApplyTo(wcfMessage);
+
+ if (amqpMessage.Properties != null)
+ {
+ wcfMessage.Properties.Add("AmqpProperties", amqpMessage.Properties);
+ }
+ }
+ catch (XmlException xmlException)
+ {
+ throw new ProtocolException(
+ "There is a problem with the XML that was received from the network. See inner exception for more details.",
+ xmlException);
+ }
+ catch (Exception e)
+ {
+ // TODO: logging
+ Console.WriteLine("TX channel encoder exception " + e);
+ }
+ finally
+ {
+ // close the amqpMessage unless the body will be read at a later time.
+ if (!this.streamed || wcfMessage == null)
+ {
+ amqpMessage.Close();
+ }
+
+ // the handoff to the encoder failed
+ if (managedBuffer != null)
+ {
+ this.bufferManager.ReturnBuffer(managedBuffer);
+ }
+ }
+
+ return wcfMessage;
+ }
+
+ private void Cleanup()
+ {
+ this.bufferManager.Clear();
+ }
+
+ // "amqp:queue1" | "amqp:stocks@broker1.com" | "amqp:queue3?routingkey=key"
+ private void ParseAmqpUri(Uri uri)
+ {
+ if (uri.Scheme != AmqpConstants.Scheme)
+ {
+ throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
+ "The scheme {0} specified in address is not supported.", uri.Scheme), "uri");
+ }
+
+ this.queueName = uri.LocalPath;
+
+ if ((this.queueName.IndexOf('@') != -1) && this.isInputChannel)
+ {
+ throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
+ "Invalid input queue name: \"{0}\" specified.", this.queueName), "uri");
+ }
+
+ // search out session parameters in the query portion of the URI
+
+ string routingParseKey = "routingkey=";
+ char[] charSeparators = new char[] { '?', ';' };
+ string[] args = uri.Query.Split(charSeparators, StringSplitOptions.RemoveEmptyEntries);
+ foreach (string s in args)
+ {
+ if (s.StartsWith(routingParseKey))
+ {
+ this.routingKey = s.Substring(routingParseKey.Length);
+ }
+ }
+
+ if (this.queueName == String.Empty)
+ {
+ if (this.isInputChannel)
+ {
+ throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
+ "Empty queue target specifier not allowed."), "uri");
+ }
+ else
+ {
+ if (this.routingKey == null)
+ {
+ throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
+ "No target queue or routing key specified."), "uri");
+ }
+ }
+ }
+ }
+ }
+}
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj Wed Sep 2 23:38:03 2009
@@ -0,0 +1,102 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.21022</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{8AABAB30-7D1E-4539-B7D1-05450262BAD2}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Apache.Qpid.Channel</RootNamespace>
+ <AssemblyName>Apache.Qpid.Channel</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <StartupObject>
+ </StartupObject>
+ <SignAssembly>false</SignAssembly>
+ <AssemblyOriginatorKeyFile>
+ </AssemblyOriginatorKeyFile>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+ <ItemGroup>
+ <Compile Include="AmqpBinaryBinding.cs" />
+ <Compile Include="AmqpBinaryBindingCollectionElement.cs" />
+ <Compile Include="AmqpBinaryBindingConfigurationElement.cs" />
+ <Compile Include="AmqpChannelFactory.cs" />
+ <Compile Include="AmqpChannelHelpers.cs" />
+ <Compile Include="AmqpChannelListener.cs" />
+ <Compile Include="AmqpBinding.cs" />
+ <Compile Include="AmqpBindingCollectionElement.cs" />
+ <Compile Include="AmqpBindingConfigurationElement.cs" />
+ <Compile Include="AmqpTransportBindingElement.cs" />
+ <Compile Include="AmqpTransportChannel.cs" />
+ <Compile Include="ConnectionManager.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="RawMessage.cs" />
+ <Compile Include="RawMessageEncoder.cs" />
+ <Compile Include="RawMessageEncoderFactory.cs" />
+ <Compile Include="RawMessageEncodingBindingElement.cs" />
+ <Compile Include="RawXmlReader.cs" />
+ <Compile Include="RawXmlWriter.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <Reference Include="System" />
+ <Reference Include="System.configuration" />
+ <Reference Include="System.Runtime.Serialization">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.ServiceModel">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.XML" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Interop\Interop.vcproj">
+ <Project>{C9B6AC75-6332-47A4-B82B-0C20E0AF2D34}</Project>
+ <Name>Interop</Name>
+ </ProjectReference>
+ </ItemGroup>
+</Project>
\ No newline at end of file
Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs Wed Sep 2 23:38:03 2009
@@ -0,0 +1,266 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Generic;
+ using System.Threading;
+
+ using Apache.Qpid.Interop;
+
+ // The ConnectionManager looks after a shareable pool of AmqpConnection and AmqpSession
+ // objects. If two connection requests could be shared (see MakeKey() properties), and
+ // are designated as shareable, then they will be paired up. Each shared connection is
+ // a separate instance of a ManagedConnection. All unshared connections use a single
+ // instance of ManagedConnection with locking turned off. The ManagedConnection object
+ // registers for notifictation when a connection goes idle (all grandchild InputLink and
+ // OutputLink objects have been closed), and closes the connection.
+
+ // TODO: the session sharing is roughed-in via comments but needs completing.
+
+ internal sealed class ConnectionManager
+ {
+ // A side effect of creating InputLinks and OutputLinks is that counters
+ // in the respective AmqpSession and AmqpConnection are updated, so care
+ // must be taken to hold the lock across acquiring a session and opening
+ // a link on it.
+
+ // one for each shared connection
+ private static Dictionary<string, ManagedConnection> sharedInstances;
+
+ // this one creates and releases connections that are not shared. No locking required.
+ private static ManagedConnection unsharedInstance;
+
+ // lock for finding or creating ManagedConnection instances
+ private static Object connectionLock;
+
+ static ConnectionManager()
+ {
+ unsharedInstance = null;
+ sharedInstances = new Dictionary<string, ManagedConnection>();
+ connectionLock = new Object();
+ }
+
+ private static string MakeKey(AmqpChannelProperties props)
+ {
+ return props.BrokerHost + ':' + props.BrokerPort + ':' + props.TransferMode;
+ }
+
+ private static ManagedConnection GetManagedConnection(AmqpChannelProperties channelProperties, bool connectionSharing)
+ {
+ if (connectionSharing)
+ {
+ string key = MakeKey(channelProperties);
+ lock (connectionLock)
+ {
+ ManagedConnection mc = null;
+ if (!sharedInstances.TryGetValue(key, out mc))
+ {
+ mc = new ManagedConnection(true);
+ sharedInstances.Add(key, mc);
+ }
+ return mc;
+ }
+ }
+ else
+ {
+ lock (connectionLock)
+ {
+ if (unsharedInstance == null)
+ {
+ unsharedInstance = new ManagedConnection(false);
+ }
+ return unsharedInstance;
+ }
+ }
+ }
+
+ public static OutputLink GetOutputLink(AmqpChannelProperties channelProperties, bool connectionSharing, bool sessionSharing, string qname)
+ {
+ ManagedConnection mc = GetManagedConnection(channelProperties, connectionSharing);
+ return (OutputLink)mc.GetLink(channelProperties, sessionSharing, null, qname);
+ }
+
+ public static InputLink GetInputLink(AmqpChannelProperties channelProperties, bool connectionSharing, bool sessionSharing, string qname)
+ {
+ ManagedConnection mc = GetManagedConnection(channelProperties, connectionSharing);
+ return (InputLink)mc.GetLink(channelProperties, sessionSharing, qname, null);
+ }
+
+
+
+ class ManagedConnection
+ {
+ private Boolean shared;
+ private AmqpConnection sharedConnection;
+ //private Dictionary<string, AmqpSession> sharedSessions;
+
+ public ManagedConnection(bool shared)
+ {
+ this.shared = shared;
+ }
+
+
+ public object GetLink(AmqpChannelProperties channelProperties, bool sessionSharing, string inputQueue, string outputQueue)
+ {
+ AmqpConnection connection = null;
+ AmqpSession session = null;
+ Object link = null;
+ bool newConnection = false;
+ //bool newSession = false;
+ bool success = false;
+
+ // when called in the non-shared case, only stack variables should be used for holding connections/sessions/links
+
+ if (this.shared)
+ {
+ Monitor.Enter(this); // lock
+ }
+
+ try
+ {
+ if (this.shared)
+ {
+ // TODO: check shared connection not closed (i.e. network drop) and refresh this instance if needed
+ if (sessionSharing)
+ {
+ throw new NotImplementedException("shared session");
+ /* * ... once we have a defined shared session config parameter:
+
+ // lazilly create
+ if (this.sharedSessions == null)
+ {
+ this.sharedSessions = new Dictionary<string, AmqpSession>();
+ }
+
+ alreadydeclaredstring sessionKey = channelProperties.name_of_key_goes_here;
+ this.sharedSessions.TryGetValue(sessionKey, out session);
+
+ * */
+ }
+
+ if (this.sharedConnection != null)
+ {
+ connection = this.sharedConnection;
+ }
+ }
+
+ if (connection == null)
+ {
+ connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort);
+ newConnection = true;
+ if (this.shared)
+ {
+ connection.OnConnectionIdle += new ConnectionIdleEventHandler(this.IdleConnectionHandler);
+ }
+ else
+ {
+ connection.OnConnectionIdle += new ConnectionIdleEventHandler(UnsharedIdleConnectionHandler);
+ }
+ }
+
+ if (session == null)
+ {
+ session = connection.CreateSession();
+ //newSession = true;
+ }
+
+ if (inputQueue != null)
+ {
+ link = session.CreateInputLink(inputQueue);
+ }
+ else
+ {
+ link = session.CreateOutputLink(outputQueue);
+ }
+
+ if (this.shared)
+ {
+ if (newConnection)
+ {
+ this.sharedConnection = connection;
+ }
+ /*
+ if (newSession)
+ {
+ sharedSessions.Add(foo, session);
+ }
+ * */
+ }
+
+ success = true;
+ }
+ finally
+ {
+ if (this.shared)
+ {
+ Monitor.Exit(this);
+ }
+ if (!success)
+ {
+ /*
+ if (newSession)
+ {
+ session.Close();
+ }
+ */
+ if (newConnection)
+ {
+ connection.Close();
+ }
+ }
+ }
+
+ return link;
+ }
+
+
+ static void UnsharedIdleConnectionHandler(Object sender, EventArgs empty)
+ {
+ if (sender is AmqpConnection)
+ {
+ AmqpConnection connection = (AmqpConnection)sender;
+ connection.Close();
+ }
+ }
+
+ void IdleConnectionHandler(Object sender, EventArgs empty)
+ {
+ lock (this)
+ {
+ if (sharedConnection != sender || sharedConnection == null)
+ {
+ return;
+ }
+ if (!sharedConnection.IsIdle)
+ {
+ // Another thread made the connection busy again.
+ // That's OK. Another idle event will come along later.
+ return;
+ }
+ sharedConnection.Close(); // also closes all child sessions
+ sharedConnection = null;
+ //sharedSessions = null;
+ }
+ }
+ }
+ }
+}
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs Wed Sep 2 23:38:03 2009
@@ -0,0 +1,52 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Apache.Qpid.Channel")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("")]
+[assembly: AssemblyCopyright("")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("ac02bbb0-2c19-43fb-a36c-b1b0a50eaf1a")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs Wed Sep 2 23:38:03 2009
@@ -0,0 +1,374 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ using System;
+ using System.IO;
+ using System.ServiceModel.Channels;
+ using System.Xml;
+
+ // This incoming Message is backed either by a Stream (bodyStream) or a byte array (bodyBytes).
+ // If bodyBytes belongs to a BufferManager, we must return it when done.
+ // The pay-off is OnGetReaderAtBodyContents().
+ // Most of the complexity is dealing with the OnCreateBufferedCopy() machinery.
+ internal class RawMessage : Message
+ {
+ private MessageHeaders headers;
+ private MessageProperties properties;
+ private XmlDictionaryReaderQuotas readerQuotas;
+ private Stream bodyStream;
+ private byte[] bodyBytes;
+ private int index;
+ private int count;
+ private BufferManager bufferManager;
+
+ public RawMessage(byte[] buffer, int index, int count, BufferManager bufferManager, XmlDictionaryReaderQuotas quotas)
+ {
+ // this constructor supports MessageEncoder.ReadMessage(ArraySegment<byte> b, BufferManager mgr, string contentType)
+ if (quotas == null)
+ {
+ quotas = new XmlDictionaryReaderQuotas();
+ }
+
+ this.headers = new MessageHeaders(MessageVersion.None);
+ this.properties = new MessageProperties();
+ this.readerQuotas = quotas;
+ this.bodyBytes = buffer;
+ this.index = index;
+ this.count = count;
+ this.bufferManager = bufferManager;
+ }
+
+ public RawMessage(Stream stream, XmlDictionaryReaderQuotas quotas)
+ {
+ // this constructor supports MessageEncoder.ReadMessage(System.IO.Stream s, int max, string contentType)
+ if (quotas == null)
+ {
+ quotas = new XmlDictionaryReaderQuotas();
+ }
+
+ this.headers = new MessageHeaders(MessageVersion.None);
+ this.properties = new MessageProperties();
+ this.bodyStream = stream;
+ }
+
+ public RawMessage(MessageHeaders headers, MessageProperties properties, byte[] bytes, int index, int count, XmlDictionaryReaderQuotas quotas)
+ {
+ // this constructor supports internal needs for CreateBufferedCopy().CreateMessage()
+ this.headers = new MessageHeaders(headers);
+ this.properties = new MessageProperties(properties);
+ this.bodyBytes = bytes;
+ this.index = index;
+ this.count = count;
+ this.readerQuotas = quotas;
+ }
+
+ public override MessageHeaders Headers
+ {
+ get
+ {
+ if (this.IsDisposed)
+ {
+ throw new ObjectDisposedException("message");
+ }
+
+ return this.headers;
+ }
+ }
+
+ public override bool IsEmpty
+ {
+ get
+ {
+ if (this.IsDisposed)
+ {
+ throw new ObjectDisposedException("message");
+ }
+
+ return false;
+ }
+ }
+
+ public override bool IsFault
+ {
+ get
+ {
+ if (this.IsDisposed)
+ {
+ throw new ObjectDisposedException("message");
+ }
+
+ return false;
+ }
+ }
+
+ public override MessageProperties Properties
+ {
+ get
+ {
+ if (this.IsDisposed)
+ {
+ throw new ObjectDisposedException("message");
+ }
+
+ return this.properties;
+ }
+ }
+
+ public override MessageVersion Version
+ {
+ get
+ {
+ if (this.IsDisposed)
+ {
+ throw new ObjectDisposedException("message");
+ }
+
+ return MessageVersion.None;
+ }
+ }
+
+ protected override void OnBodyToString(XmlDictionaryWriter writer)
+ {
+ if (this.bodyStream != null)
+ {
+ writer.WriteString("Stream");
+ }
+ else
+ {
+ writer.WriteStartElement(RawMessageEncoder.StreamElementName, string.Empty);
+ writer.WriteBase64(this.bodyBytes, this.index, this.count);
+ writer.WriteEndElement();
+ }
+ }
+
+ protected override void OnClose()
+ {
+ Exception deferEx = null;
+ try
+ {
+ base.OnClose();
+ }
+ catch (Exception e)
+ {
+ deferEx = e;
+ }
+
+ try
+ {
+ if (this.properties != null)
+ {
+ this.properties.Dispose();
+ }
+ }
+ catch (Exception e)
+ {
+ if (deferEx == null)
+ {
+ deferEx = e;
+ }
+ }
+
+ try
+ {
+ if (this.bufferManager != null)
+ {
+ this.bufferManager.ReturnBuffer(this.bodyBytes);
+ this.bufferManager = null;
+ }
+ }
+ catch (Exception e)
+ {
+ if (deferEx == null)
+ {
+ deferEx = e;
+ }
+ }
+
+ if (deferEx != null)
+ {
+ throw deferEx;
+ }
+ }
+
+ protected override MessageBuffer OnCreateBufferedCopy(int maxBufferSize)
+ {
+ if (this.bodyStream != null)
+ {
+ int len = (int)this.bodyStream.Length;
+ byte[] buf = new byte[len];
+ this.bodyStream.Read(buf, 0, len);
+ this.bodyStream = null;
+ this.bodyBytes = buf;
+ this.count = len;
+ this.index = 0;
+ }
+ else
+ {
+ if (this.bufferManager != null)
+ {
+ // we could take steps to share the buffer among copies and release the memory
+ // after the last user finishes by a reference count or such, but we are already
+ // far from the intended optimized use. Make one GC managed memory copy that is
+ // shared by all.
+ byte[] buf = new byte[this.count];
+
+ Buffer.BlockCopy(this.bodyBytes, this.index, buf, 0, this.count);
+ this.bufferManager.ReturnBuffer(this.bodyBytes);
+ this.bufferManager = null;
+ this.bodyBytes = buf;
+ this.index = 0;
+ }
+ }
+
+ return new RawMessageBuffer(this.headers, this.properties, this.bodyBytes, this.index, this.count, this.readerQuotas);
+ }
+
+ protected override XmlDictionaryReader OnGetReaderAtBodyContents()
+ {
+ Stream readerStream = null;
+ bool ownsStream;
+
+ if (this.bodyStream != null)
+ {
+ readerStream = this.bodyStream;
+ ownsStream = false;
+ }
+ else
+ {
+ // create stream for duration of XmlReader.
+ ownsStream = true;
+ if (this.bufferManager != null)
+ {
+ readerStream = new RawMemoryStream(this.bodyBytes, this.index, this.count, this.bufferManager);
+ this.bufferManager = null;
+ }
+ else
+ {
+ readerStream = new MemoryStream(this.bodyBytes, this.index, this.count, false);
+ }
+ }
+
+ return new RawXmlReader(readerStream, this.readerQuotas, ownsStream);
+ }
+
+ protected override void OnWriteBodyContents(XmlDictionaryWriter writer)
+ {
+ writer.WriteStartElement(RawMessageEncoder.StreamElementName, string.Empty);
+ if (this.bodyStream != null)
+ {
+ int len = (int)this.bodyStream.Length;
+ byte[] buf = new byte[len];
+ this.bodyStream.Read(buf, 0, len);
+ writer.WriteBase64(buf, 0, len);
+ }
+ else
+ {
+ writer.WriteBase64(this.bodyBytes, this.index, this.count);
+ }
+
+ writer.WriteEndElement();
+ }
+
+ private class RawMemoryStream : MemoryStream
+ {
+ private BufferManager bufferManager;
+ private byte[] buffer;
+
+ public RawMemoryStream(byte[] bytes, int index, int count, BufferManager mgr)
+ : base(bytes, index, count, false)
+ {
+ this.bufferManager = mgr;
+ this.buffer = bytes;
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (this.bufferManager != null)
+ {
+ try
+ {
+ this.bufferManager.ReturnBuffer(this.buffer);
+ }
+ finally
+ {
+ this.bufferManager = null;
+ base.Dispose(disposing);
+ }
+ }
+ }
+ }
+
+ private class RawMessageBuffer : MessageBuffer
+ {
+ private bool closed;
+ private MessageHeaders headers;
+ private MessageProperties properties;
+ private byte[] bodyBytes;
+ private int index;
+ private int count;
+ private XmlDictionaryReaderQuotas readerQuotas;
+
+ public RawMessageBuffer(MessageHeaders headers, MessageProperties properties, byte[] bytes, int index, int count, XmlDictionaryReaderQuotas quotas)
+ : base()
+ {
+ this.headers = new MessageHeaders(headers);
+ this.properties = new MessageProperties(properties);
+ this.bodyBytes = bytes;
+ this.index = index;
+ this.count = count;
+ this.readerQuotas = new XmlDictionaryReaderQuotas();
+ quotas.CopyTo(this.readerQuotas);
+ }
+
+ public override int BufferSize
+ {
+ get { return this.count; }
+ }
+
+ public override void Close()
+ {
+ if (!this.closed)
+ {
+ this.closed = true;
+ this.headers = null;
+ if (this.properties != null)
+ {
+ this.properties.Dispose();
+ this.properties = null;
+ }
+
+ this.bodyBytes = null;
+ this.readerQuotas = null;
+ }
+ }
+
+ public override Message CreateMessage()
+ {
+ if (this.closed)
+ {
+ throw new ObjectDisposedException("message");
+ }
+
+ return new RawMessage(this.headers, this.properties, this.bodyBytes, this.index, this.count, this.readerQuotas);
+ }
+ }
+ }
+}
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs Wed Sep 2 23:38:03 2009
@@ -0,0 +1,113 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ using System;
+ using System.IO;
+ using System.ServiceModel.Channels;
+ using System.ServiceModel;
+ using System.Xml;
+
+
+ class RawMessageEncoder : MessageEncoder
+ {
+ public const string StreamElementName = "Binary";
+
+ XmlDictionaryReaderQuotas readerQuotas;
+
+ public RawMessageEncoder(XmlDictionaryReaderQuotas quotas)
+ {
+ this.readerQuotas = new XmlDictionaryReaderQuotas();
+ if (quotas != null)
+ {
+ quotas.CopyTo(this.readerQuotas);
+ }
+ }
+
+ public override string ContentType
+ {
+ get { return null; }
+ }
+
+ public override bool IsContentTypeSupported(string contentType)
+ {
+ return true;
+ }
+
+ public override string MediaType
+ {
+ get { return null; }
+ }
+
+ public override MessageVersion MessageVersion
+ {
+ get { return MessageVersion.None; }
+ }
+
+ public override Message ReadMessage(ArraySegment<byte> buffer, BufferManager bufferManager, string contentType)
+ {
+ RawMessage message = new RawMessage(buffer.Array, buffer.Offset, buffer.Count, bufferManager, readerQuotas);
+ message.Properties.Encoder = this;
+ return message;
+ }
+
+ public override Message ReadMessage(Stream stream, int maxSizeOfHeaders, string contentType)
+ {
+ RawMessage message = new RawMessage(stream, readerQuotas);
+ message.Properties.Encoder = this;
+ return message;
+ }
+
+ private void CheckType(XmlDictionaryReader reader, XmlNodeType type)
+ {
+ if (reader.NodeType != type)
+ {
+ throw new System.IO.InvalidDataException(String.Format("RawMessageEncoder xml check {0} type should be {1}", type, reader.NodeType));
+ }
+ }
+
+ public override ArraySegment<byte> WriteMessage(Message message, int maxMessageSize, BufferManager bufferManager, int messageOffset)
+ {
+ MemoryStream tempStream = new MemoryStream();
+ this.WriteMessage(message, tempStream);
+ int len = messageOffset + (int)tempStream.Length;
+ byte[] buf = bufferManager.TakeBuffer(len);
+ MemoryStream targetStream = new MemoryStream(buf);
+ if (messageOffset > 0)
+ {
+ targetStream.Seek(messageOffset, SeekOrigin.Begin);
+ }
+
+ tempStream.WriteTo(targetStream);
+ targetStream.Close();
+
+ return new ArraySegment<byte>(buf, messageOffset, len - messageOffset);
+ }
+
+ public override void WriteMessage(Message message, Stream stream)
+ {
+ using (XmlWriter writer = new RawXmlWriter(stream))
+ {
+ message.WriteMessage(writer);
+ writer.Flush();
+ }
+ }
+ }
+}
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs Wed Sep 2 23:38:03 2009
@@ -0,0 +1,45 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ using System;
+ using System.Xml;
+ using System.ServiceModel.Channels;
+
+ internal class RawMessageEncoderFactory : MessageEncoderFactory
+ {
+ RawMessageEncoder encoder;
+
+ public RawMessageEncoderFactory(XmlDictionaryReaderQuotas quotas)
+ {
+ this.encoder = new RawMessageEncoder(quotas);
+ }
+
+ public override MessageEncoder Encoder
+ {
+ get { return this.encoder; }
+ }
+
+ public override MessageVersion MessageVersion
+ {
+ get { return encoder.MessageVersion; }
+ }
+ }
+}
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs Wed Sep 2 23:38:03 2009
@@ -0,0 +1,102 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ using System;
+ using System.ServiceModel.Channels;
+
+ public class RawMessageEncodingBindingElement : MessageEncodingBindingElement
+ {
+
+ public RawMessageEncodingBindingElement()
+ : base()
+ {
+ }
+
+ RawMessageEncodingBindingElement(RawMessageEncodingBindingElement originalBindingElement)
+ {
+ }
+
+ public override MessageEncoderFactory CreateMessageEncoderFactory()
+ {
+ return new RawMessageEncoderFactory(null);
+ }
+
+
+ public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
+ {
+ if (context == null)
+ throw new ArgumentNullException("context");
+
+ context.BindingParameters.Add(this);
+ return context.BuildInnerChannelFactory<TChannel>();
+ }
+
+ public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
+ {
+ if (context == null)
+ throw new ArgumentNullException("context");
+
+ return context.CanBuildInnerChannelFactory<TChannel>();
+ }
+
+ public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
+ {
+ if (context == null)
+ throw new ArgumentNullException("context");
+
+ context.BindingParameters.Add(this);
+ return context.BuildInnerChannelListener<TChannel>();
+ }
+
+ public override bool CanBuildChannelListener<TChannel>(BindingContext context)
+ {
+ if (context == null)
+ throw new ArgumentNullException("context");
+
+ context.BindingParameters.Add(this);
+ return context.CanBuildInnerChannelListener<TChannel>();
+ }
+
+
+ public override BindingElement Clone()
+ {
+ return new RawMessageEncodingBindingElement(this);
+ }
+
+
+
+ public override MessageVersion MessageVersion
+ {
+ get
+ {
+ return MessageVersion.None;
+ }
+
+ set
+ {
+ if (value != MessageVersion.None)
+ throw new ArgumentException("Unsupported message version");
+ }
+ }
+
+
+ }
+}
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs Wed Sep 2 23:38:03 2009
@@ -0,0 +1,353 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ using System;
+ using System.IO;
+ using System.Xml;
+
+ internal class RawXmlReader : XmlDictionaryReader
+ {
+ ////this class presents a hardcoded XML InfoSet: "<rawtag>X</rawtag>" where X is the entire stream content
+
+ private Stream stream;
+ private bool closed;
+ private bool streamOwner;
+ private ReaderPosition position;
+ private string contentAsBase64;
+ private XmlNameTable xmlNameTable;
+ private XmlDictionaryReaderQuotas readerQuotas;
+
+ public RawXmlReader(Stream stream, XmlDictionaryReaderQuotas quotas, bool streamOwner)
+ {
+ this.stream = stream;
+ this.streamOwner = streamOwner;
+ if (quotas == null)
+ {
+ this.readerQuotas = new XmlDictionaryReaderQuotas();
+ }
+ else
+ {
+ this.readerQuotas = quotas;
+ }
+ }
+
+ private enum ReaderPosition
+ {
+ None,
+ StartElement,
+ Content,
+ EndElement,
+ EOF
+ }
+
+ public override int AttributeCount
+ {
+ get { return 0; }
+ }
+
+ public override string BaseURI
+ {
+ get { return string.Empty; }
+ }
+
+ public override int Depth
+ {
+ get { return (this.position == ReaderPosition.Content) ? 1 : 0; }
+ }
+
+ public override bool EOF
+ {
+ get { return this.position == ReaderPosition.EOF; }
+ }
+
+ public override bool HasAttributes
+ {
+ get { return false; }
+ }
+
+ public override bool HasValue
+ {
+ get { return this.position == ReaderPosition.Content; }
+ }
+
+ public override bool IsEmptyElement
+ {
+ get { return false; }
+ }
+
+ public override string LocalName
+ {
+ get
+ {
+ if (this.position == ReaderPosition.StartElement)
+ {
+ return RawMessageEncoder.StreamElementName;
+ }
+
+ return null;
+ }
+ }
+
+ public override string NamespaceURI
+ {
+ get { return string.Empty; }
+ }
+
+ public override XmlNameTable NameTable
+ {
+ get
+ {
+ if (this.xmlNameTable == null)
+ {
+ this.xmlNameTable = new NameTable();
+ this.xmlNameTable.Add(RawMessageEncoder.StreamElementName);
+ }
+
+ return this.xmlNameTable;
+ }
+ }
+
+ public override XmlNodeType NodeType
+ {
+ get
+ {
+ switch (this.position)
+ {
+ case ReaderPosition.StartElement:
+ return XmlNodeType.Element;
+ case ReaderPosition.Content:
+ return XmlNodeType.Text;
+ case ReaderPosition.EndElement:
+ return XmlNodeType.EndElement;
+ default:
+ // and StreamPosition.EOF
+ return XmlNodeType.None;
+ }
+ }
+ }
+
+ public override string Prefix
+ {
+ get { return string.Empty; }
+ }
+
+ public override ReadState ReadState
+ {
+ get
+ {
+ switch (this.position)
+ {
+ case ReaderPosition.None:
+ return ReadState.Initial;
+ case ReaderPosition.StartElement:
+ case ReaderPosition.Content:
+ case ReaderPosition.EndElement:
+ return ReadState.Interactive;
+ case ReaderPosition.EOF:
+ return ReadState.Closed;
+ default:
+ return ReadState.Error;
+ }
+ }
+ }
+
+ public override string Value
+ {
+ get
+ {
+ switch (this.position)
+ {
+ case ReaderPosition.Content:
+ if (this.contentAsBase64 == null)
+ {
+ this.contentAsBase64 = Convert.ToBase64String(this.ReadContentAsBase64());
+ }
+
+ return this.contentAsBase64;
+
+ default:
+ return string.Empty;
+ }
+ }
+ }
+
+ public override void Close()
+ {
+ if (!this.closed)
+ {
+ this.closed = true;
+ this.position = ReaderPosition.EOF;
+ this.readerQuotas = null;
+ if (this.streamOwner)
+ {
+ this.stream.Close();
+ }
+ }
+ }
+
+ public override string GetAttribute(int i)
+ {
+ throw new ArgumentOutOfRangeException("i", i, "Argument not in set of valid values");
+ }
+
+ public override string GetAttribute(string name, string namespaceURI)
+ {
+ return null;
+ }
+
+ public override string GetAttribute(string name)
+ {
+ return null;
+ }
+
+ public override string LookupNamespace(string prefix)
+ {
+ if (prefix == string.Empty)
+ {
+ return string.Empty;
+ }
+ else if (prefix == "xml")
+ {
+ return "http://www.w3.org/XML/1998/namespace";
+ }
+ else if (prefix == "xmlns")
+ {
+ return "http://www.w3.org/2000/xmlns/";
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public override bool MoveToAttribute(string name, string ns)
+ {
+ return false;
+ }
+
+ public override bool MoveToAttribute(string name)
+ {
+ return false;
+ }
+
+ public override bool MoveToElement()
+ {
+ if (this.position == ReaderPosition.None)
+ {
+ this.position = ReaderPosition.StartElement;
+ return true;
+ }
+
+ return false;
+ }
+
+ public override bool MoveToFirstAttribute()
+ {
+ return false;
+ }
+
+ public override bool MoveToNextAttribute()
+ {
+ return false;
+ }
+
+ public override bool Read()
+ {
+ switch (this.position)
+ {
+ case ReaderPosition.None:
+ this.position = ReaderPosition.StartElement;
+ return true;
+ case ReaderPosition.StartElement:
+ this.position = ReaderPosition.Content;
+ return true;
+ case ReaderPosition.Content:
+ this.position = ReaderPosition.EndElement;
+ return true;
+ case ReaderPosition.EndElement:
+ this.position = ReaderPosition.EOF;
+ return false;
+ case ReaderPosition.EOF:
+ return false;
+ default:
+ return false;
+ }
+ }
+
+ public override bool ReadAttributeValue()
+ {
+ return false;
+ }
+
+ public override int ReadContentAsBase64(byte[] buffer, int index, int count)
+ {
+ if (buffer == null)
+ {
+ throw new ArgumentNullException("buffer");
+ }
+
+ if (this.position != ReaderPosition.Content)
+ {
+ throw new InvalidOperationException("XML reader not in Element");
+ }
+
+ if (count == 0)
+ {
+ return 0;
+ }
+
+ int readCount = this.stream.Read(buffer, index, count);
+ if (readCount == 0)
+ {
+ this.position = ReaderPosition.EndElement;
+ }
+
+ return readCount;
+ }
+
+ public override int ReadContentAsBinHex(byte[] buffer, int index, int count)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void ResolveEntity()
+ {
+ throw new NotSupportedException();
+ }
+
+ public override bool TryGetBase64ContentLength(out int length)
+ {
+ // The whole stream is this one element
+ if (!this.closed && this.stream.CanSeek)
+ {
+ long streamLength = this.stream.Length;
+ if (streamLength <= int.MaxValue)
+ {
+ length = (int)streamLength;
+ return true;
+ }
+ }
+
+ length = -1;
+ return false;
+ }
+ }
+}
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs Wed Sep 2 23:38:03 2009
@@ -0,0 +1,221 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ using System;
+ using System.IO;
+ using System.Xml;
+
+ internal sealed class RawXmlWriter : XmlDictionaryWriter
+ {
+
+ WriteState state;
+ Stream stream;
+ bool closed;
+ bool rawWritingEnabled;
+
+ public RawXmlWriter(Stream stream)
+ {
+ if (stream == null)
+ {
+ throw new ArgumentNullException("Stream");
+ }
+
+ this.stream = stream;
+ this.state = WriteState.Start;
+ }
+
+ public override WriteState WriteState
+ {
+ get
+ {
+ return this.state;
+ }
+ }
+
+ public override void Close()
+ {
+ if (!this.closed)
+ {
+ this.closed = true;
+ this.state = WriteState.Closed;
+ this.rawWritingEnabled = false;
+ }
+ }
+
+ public override void Flush()
+ {
+ this.ThrowIfClosed();
+ this.stream.Flush();
+ }
+
+ public override string LookupPrefix(string ns)
+ {
+ return null;
+ }
+
+ public override void WriteBase64(byte[] buffer, int index, int count)
+ {
+ if (buffer == null)
+ {
+ throw new ArgumentNullException("buffer");
+ }
+
+ ThrowIfClosed();
+
+ if (!this.rawWritingEnabled)
+ {
+ throw new InvalidOperationException("XmlWriter not in Element");
+ }
+
+ this.stream.Write(buffer, index, count);
+ this.state = WriteState.Content;
+ }
+
+ public override void WriteStartElement(string prefix, string localName, string ns)
+ {
+ ThrowIfClosed();
+ if (this.state != WriteState.Start)
+ {
+ throw new InvalidOperationException("Start Element Already Called");
+ }
+
+ if (!string.IsNullOrEmpty(prefix) || !string.IsNullOrEmpty(ns) || localName != RawMessageEncoder.StreamElementName)
+ {
+ throw new XmlException("Wrong XML Start Element Name");
+ }
+ this.state = WriteState.Element;
+ this.rawWritingEnabled = true;
+ }
+
+ public override void WriteEndElement()
+ {
+ ThrowIfClosed();
+ if (!this.rawWritingEnabled)
+ {
+ throw new InvalidOperationException("Unexpected End Element");
+ }
+ this.rawWritingEnabled = false;
+ }
+
+ public override void WriteFullEndElement()
+ {
+ this.WriteEndElement();
+ }
+
+ public override void WriteEndDocument()
+ {
+ this.rawWritingEnabled = false;
+ this.ThrowIfClosed();
+ }
+
+ public override void WriteStartDocument()
+ {
+ this.rawWritingEnabled = false;
+ this.ThrowIfClosed();
+ }
+
+ public override void WriteStartDocument(bool standalone)
+ {
+ this.rawWritingEnabled = false;
+ this.ThrowIfClosed();
+ }
+
+ private void ThrowIfClosed()
+ {
+ if (this.closed)
+ {
+ throw new InvalidOperationException("XML Writer closed");
+ }
+ }
+
+
+ public override void WriteString(string text)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void WriteCData(string text)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void WriteCharEntity(char ch)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void WriteChars(char[] buffer, int index, int count)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void WriteComment(string text)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void WriteDocType(string name, string pubid, string sysid, string subset)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void WriteEndAttribute()
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void WriteEntityRef(string name)
+ {
+ throw new NotSupportedException();
+ }
+
+
+ public override void WriteProcessingInstruction(string name, string text)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void WriteRaw(string data)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void WriteRaw(char[] buffer, int index, int count)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void WriteStartAttribute(string prefix, string localName, string ns)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void WriteSurrogateCharEntity(char lowChar, char highChar)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void WriteWhitespace(string ws)
+ {
+ throw new NotSupportedException();
+ }
+ }
+}
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp Wed Sep 2 23:38:03 2009
@@ -0,0 +1,165 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/framing/FrameSet.h"
+
+#include "AmqpConnection.h"
+#include "AmqpSession.h"
+#include "QpidMarshal.h"
+#include "QpidException.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace std;
+
+
+// Note on locks: Use "this" for fast counting and idle/busy
+// notifications. Use the "sessions" list to serialize session
+// creation/reaping and overall tear down.
+// TODO: switch "this" lock to separate non-visible Object.
+
+
+AmqpConnection::AmqpConnection(String^ server, int port) :
+ connectionp(NULL),
+ busyCount(0),
+ disposed(false)
+{
+ bool success = false;
+ System::Exception^ openException = nullptr;
+ sessions = gcnew Collections::Generic::List<AmqpSession^>();
+
+ try {
+ connectionp = new Connection;
+ connectionp->open (QpidMarshal::ToNative(server), port);
+ // TODO: registerFailureCallback for failover
+ success = true;
+ const ConnectionSettings& settings = connectionp->getNegotiatedSettings();
+ this->maxFrameSize = settings.maxFrameSize;
+ } catch (const qpid::Exception& error) {
+ String^ errmsg = gcnew String(error.what());
+ openException = gcnew QpidException(errmsg);
+ } finally {
+ if (!success) {
+ Cleanup();
+ if (openException == nullptr) {
+ openException = gcnew QpidException ("unknown connection failure");
+ }
+ throw openException;
+ }
+ }
+}
+
+void AmqpConnection::Cleanup()
+{
+ {
+ lock l(sessions);
+ if (disposed)
+ return;
+ disposed = true;
+ }
+
+ try {
+ // let the child sessions clean up
+ for each(AmqpSession^ s in sessions) {
+ s->ConnectionClosed();
+ }
+ }
+ finally
+ {
+ if (connectionp != NULL) {
+ connectionp->close();
+ delete connectionp;
+ connectionp = NULL;
+ }
+ }
+}
+
+AmqpConnection::~AmqpConnection()
+{
+ Cleanup();
+}
+
+AmqpConnection::!AmqpConnection()
+{
+ Cleanup();
+}
+
+void AmqpConnection::Close()
+{
+ // Simulate Dispose()...
+ Cleanup();
+ GC::SuppressFinalize(this);
+}
+
+AmqpSession^ AmqpConnection::CreateSession()
+{
+ lock l(sessions);
+ if (disposed) {
+ throw gcnew ObjectDisposedException("AmqpConnection");
+ }
+ AmqpSession^ session = gcnew AmqpSession(this, connectionp);
+ sessions->Add(session);
+ return session;
+}
+
+// called whenever a child session becomes newly busy (a first reader or writer since last idle)
+
+void AmqpConnection::NotifyBusy()
+{
+ bool changed = false;
+ {
+ lock l(this);
+ if (busyCount++ == 0)
+ changed = true;
+ }
+}
+
+// called whenever a child session becomes newly idle (a last reader or writer has closed)
+// The connection is idle when none of its child sessions are busy
+
+void AmqpConnection::NotifyIdle()
+{
+ bool connectionIdle = false;
+ {
+ lock l(this);
+ if (--busyCount == 0)
+ connectionIdle = true;
+ }
+ if (connectionIdle) {
+ OnConnectionIdle(this, System::EventArgs::Empty);
+ }
+}
+
+
+}}} // namespace Apache::Qpid::Interop
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h Wed Sep 2 23:38:03 2009
@@ -0,0 +1,71 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace std;
+using namespace qpid::client;
+
+ref class AmqpSession;
+
+public delegate void ConnectionIdleEventHandler(Object^ sender, EventArgs^ eventArgs);
+
+public ref class AmqpConnection
+{
+private:
+ Connection* connectionp;
+ void Cleanup();
+ bool disposed;
+ Collections::Generic::List<AmqpSession^>^ sessions;
+ bool isOpen;
+ int busyCount;
+ int maxFrameSize;
+
+ internal:
+ void NotifyBusy();
+ void NotifyIdle();
+
+ property int MaxFrameSize {
+ int get () { return maxFrameSize; }
+ }
+
+public:
+ AmqpConnection(System::String^ server, int port);
+ ~AmqpConnection();
+ !AmqpConnection();
+ void Close();
+ AmqpSession^ CreateSession();
+ event ConnectionIdleEventHandler^ OnConnectionIdle;
+
+ property bool IsOpen {
+ bool get() { return isOpen; }
+ };
+
+ property bool IsIdle {
+ bool get() { return (busyCount == 0); }
+ }
+};
+
+
+}}} // namespace Apache::Qpid::Interop
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org