You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/09/03 01:38:07 UTC

svn commit: r810734 [3/5] - in /qpid/trunk/qpid/wcf: ./ samples/ samples/Channel/ samples/Channel/WCFToWCFDirect/ samples/Channel/WCFToWCFDirect/Client/ samples/Channel/WCFToWCFDirect/Client/Properties/ samples/Channel/WCFToWCFDirect/Service/ samples/C...

Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs Wed Sep  2 23:38:03 2009
@@ -0,0 +1,592 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+// TODO: flow control
+//       timeout handling
+//       transactions
+//       check if should split into separate input and output classes (little overlap)
+
+namespace Apache.Qpid.Channel
+{
+    using System;
+    using System.Collections;
+    using System.Collections.Generic;
+    using System.ServiceModel;
+    using System.ServiceModel.Channels;
+    using System.Text;
+    using System.Threading;
+    using System.Globalization;
+    using System.Xml;
+
+    // the thin interop layer that provides access to the Qpid AMQP client libraries
+    using Apache.Qpid.Interop;
+    using Apache.Qpid.AmqpTypes;
+
+    /// <summary>
+    /// WCF client transport channel for accessing AMQP brokers using the Qpid C++ library
+    /// </summary>
+    public class AmqpTransportChannel : ChannelBase, IOutputChannel, IInputChannel
+    {
+        private static readonly EndpointAddress AnonymousAddress =
+            new EndpointAddress("http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous");
+
+        private EndpointAddress remoteAddress;
+        private MessageEncoder encoder;
+        private AmqpChannelProperties factoryChannelProperties;
+        private bool shared;
+        private string encoderContentType;
+
+        // input = 0-10 queue, output = 0-10 exchange
+        private string queueName;
+
+        private String routingKey;
+        private BufferManager bufferManager;
+        private AmqpProperties outputMessageProperties;
+
+        private InputLink inputLink;
+        private OutputLink outputLink;
+
+        private bool isInputChannel;
+        private bool streamed;
+
+        private AsyncTimeSpanCaller asyncOpenCaller;
+        private AsyncTimeSpanCaller asyncCloseCaller;
+
+        internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection)
+            : base(factory)
+        {
+            this.isInputChannel = (factory is ChannelListenerBase) || (factory is AmqpChannelFactory<IInputChannel>);
+
+            if (remoteAddress == null)
+            {
+                throw new ArgumentException("Null Endpoint Address");
+            }
+
+            this.factoryChannelProperties = channelProperties;
+            this.shared = sharedConnection;
+            this.remoteAddress = remoteAddress;
+
+            // pull out host, port, queue, and connection arguments
+            this.ParseAmqpUri(remoteAddress.Uri);
+
+            this.encoder = msgEncoder;
+            string ct = String.Empty;
+            if (this.encoder != null)
+            {
+                ct = this.encoder.ContentType;
+                if (ct != null)
+                {
+                    int pos = ct.IndexOf(';');
+                    if (pos != -1)
+                    {
+                        ct = ct.Substring(0, pos).Trim();
+                    }
+                }
+                else
+                {
+                    ct = "application/octet-stream";
+                }
+            }
+
+            this.encoderContentType = ct;
+
+            if (this.factoryChannelProperties.TransferMode == TransferMode.Streamed)
+            {
+                this.streamed = true;
+            }
+            else
+            {
+                if (!(this.factoryChannelProperties.TransferMode == TransferMode.Buffered))
+                {
+                    throw new ArgumentException("TransferMode mode must be \"Streamed\" or \"Buffered\"");
+                }
+
+                this.streamed = false;
+            }
+
+            this.bufferManager = BufferManager.CreateBufferManager(maxBufferPoolSize, int.MaxValue);
+
+            this.asyncOpenCaller = new AsyncTimeSpanCaller(this.OnOpen);
+            this.asyncCloseCaller = new AsyncTimeSpanCaller(this.OnClose);
+
+            if (this.isInputChannel)
+            {
+                this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, this.queueName);
+            }
+            else
+            {
+                this.outputLink = ConnectionManager.GetOutputLink(this.factoryChannelProperties, shared, false, this.queueName);
+            }
+        }
+
+        private delegate bool AsyncTryReceiveCaller(TimeSpan timeout, out Message message);
+
+        private delegate void AsyncTimeSpanCaller(TimeSpan timeout);
+
+        EndpointAddress IOutputChannel.RemoteAddress
+        {
+            get
+            {
+                return this.remoteAddress;
+            }
+        }
+
+        // i.e what you would insert into a ReplyTo header to reach
+        // here.  Presumably should be exchange/link and routing info,
+        // rather than the actual input queue name.
+        EndpointAddress IInputChannel.LocalAddress
+        {
+            get
+            {
+                // TODO: something better
+                return AnonymousAddress;
+            }
+        }
+
+        AmqpProperties OutputMessageProperties
+        {
+            get
+            {
+                if (this.outputMessageProperties == null)
+                {
+                    this.outputMessageProperties = this.factoryChannelProperties.DefaultMessageProperties;
+                    if (this.outputMessageProperties == null)
+                    {
+                        this.outputMessageProperties = new AmqpProperties();
+                    }
+                }
+
+                return this.outputMessageProperties;
+            }
+        }
+
+        Uri IOutputChannel.Via
+        {
+            get
+            {
+                return this.remoteAddress.Uri;
+            }
+        }
+
+        public override T GetProperty<T>()
+        {
+            if (typeof(T) == typeof(IInputChannel))
+            {
+                if (this.isInputChannel)
+                {
+                    return (T)(object)this;
+                }
+            }
+            else if (typeof(T) == typeof(IOutputChannel))
+            {
+                if (!this.isInputChannel)
+                {
+                    return (T)(object)this;
+                }
+            }
+
+            return base.GetProperty<T>();
+        }
+
+        public void Send(Message message, TimeSpan timeout)
+        {
+            this.ThrowIfDisposedOrNotOpen();
+            AmqpChannelHelpers.ValidateTimeout(timeout);
+
+            try
+            {
+                using (AmqpMessage amqpMessage = this.WcfToQpid(message))
+                {
+                    this.outputLink.Send(amqpMessage, timeout);
+                }
+            }
+            finally
+            {
+                message.Close();
+            }
+        }
+
+        public void Send(Message message)
+        {
+            this.Send(message, this.DefaultSendTimeout);
+        }
+
+        public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
+        {
+            this.ThrowIfDisposedOrNotOpen();
+            AmqpChannelHelpers.ValidateTimeout(timeout);
+
+            try
+            {
+                using (AmqpMessage amqpMessage = this.WcfToQpid(message))
+                {
+                    return this.outputLink.BeginSend(amqpMessage, timeout, callback, state);
+                }
+            }
+            finally
+            {
+                message.Close();
+            }
+        }
+
+        public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state)
+        {
+            return this.BeginSend(message, this.DefaultSendTimeout, callback, state);
+        }
+
+        public void EndSend(IAsyncResult result)
+        {
+            this.outputLink.EndSend(result);
+        }
+
+        public Message Receive(TimeSpan timeout)
+        {
+            Message message;
+            if (this.TryReceive(timeout, out message))
+            {
+                return message;
+            }
+            else
+            {
+                throw new TimeoutException("Receive");
+            }
+        }
+
+        public Message Receive()
+        {
+            return this.Receive(this.DefaultReceiveTimeout);
+        }
+
+        public bool TryReceive(TimeSpan timeout, out Message message)
+        {
+            this.ThrowIfDisposedOrNotOpen();
+            AmqpMessage amqpMessage;
+            message = null;
+
+            if (this.inputLink.TryReceive(timeout, out amqpMessage))
+            {
+                message = this.QpidToWcf(amqpMessage);
+                return true;
+            }
+
+            return false;
+        }
+ 
+        public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
+        {
+            return this.inputLink.BeginTryReceive(timeout, callback, state);
+        }
+
+        public bool EndTryReceive(IAsyncResult result, out Message message)
+        {
+            AmqpMessage amqpMessage = null;
+            if (!this.inputLink.EndTryReceive(result, out amqpMessage))
+            {
+                message = null;
+                return false;
+            }
+            message = QpidToWcf(amqpMessage);
+            return true;
+        }
+
+        public bool WaitForMessage(TimeSpan timeout)
+        {
+            return this.inputLink.WaitForMessage(timeout);
+        }
+
+        public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
+        {
+            return this.inputLink.BeginTryReceive(timeout, callback, state);
+        }
+
+        public IAsyncResult BeginReceive(AsyncCallback callback, object state)
+        {
+            return this.BeginReceive(this.DefaultReceiveTimeout, callback, state);
+        }
+
+        public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
+        {
+            return this.inputLink.BeginWaitForMessage(timeout, callback, state);
+        }
+
+        public Message EndReceive(IAsyncResult result)
+        {
+            Message message;
+            if (this.EndTryReceive(result, out message))
+            {
+                return message;
+            }
+            else
+            {
+                throw new TimeoutException("EndReceive");
+            }
+        }
+
+        public bool EndWaitForMessage(IAsyncResult result)
+        {
+            return this.inputLink.EndWaitForMessage(result);
+        }
+
+        public void CloseEndPoint()
+        {
+            if (this.inputLink != null)
+            {
+                this.inputLink.Close();
+            }
+            if (this.outputLink != null)
+            {
+                this.outputLink.Close();
+            }
+        }
+
+        /// <summary>
+        /// Open connection to Broker
+        /// </summary>
+        protected override void OnOpen(TimeSpan timeout)
+        {
+            // TODO: move open logic to here from constructor
+        }
+
+        protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
+        {
+            return this.asyncOpenCaller.BeginInvoke(timeout, callback, state);
+        }
+
+        protected override void OnEndOpen(IAsyncResult result)
+        {
+            this.asyncOpenCaller.EndInvoke(result);
+        }
+
+        protected override void OnAbort()
+        {
+            //// TODO: check for network-less qpid teardown or launch special thread
+            this.Cleanup();
+        }
+
+        /// <summary>
+        /// Shutdown gracefully
+        /// </summary>
+        protected override void OnClose(TimeSpan timeout)
+        {
+            this.CloseEndPoint();
+            this.Cleanup();
+        }
+
+        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
+        {
+            return this.asyncCloseCaller.BeginInvoke(timeout, callback, state);
+        }
+
+        protected override void OnEndClose(IAsyncResult result)
+        {
+            this.asyncCloseCaller.EndInvoke(result);
+        }
+
+        private AmqpMessage WcfToQpid(Message wcfMessage)
+        {
+            object obj;
+            AmqpProperties applicationProperties = null;
+            bool success = false;
+            AmqpMessage amqpMessage = null;
+
+            if (wcfMessage.Properties.TryGetValue("AmqpProperties", out obj))
+            {
+                applicationProperties = obj as AmqpProperties;
+            }
+
+            try
+            {
+                AmqpProperties outgoingProperties = new AmqpProperties();
+
+                // Start with AMQP properties from the binding and the URI
+                if (this.factoryChannelProperties.DefaultMessageProperties != null)
+                {
+                    outgoingProperties.MergeFrom(this.factoryChannelProperties.DefaultMessageProperties);
+                }
+
+                if (this.routingKey != null)
+                {
+                    outgoingProperties.RoutingKey = this.routingKey;
+                }
+
+                // Add the Properties set by the application on this particular message.
+                // Application properties trump channel properties
+                if (applicationProperties != null)
+                {
+                    outgoingProperties.MergeFrom(applicationProperties);
+                }
+
+                amqpMessage = this.outputLink.CreateMessage();
+                amqpMessage.Properties = outgoingProperties;
+
+                // copy the WCF message body to the AMQP message body
+                if (this.streamed)
+                {
+                    this.encoder.WriteMessage(wcfMessage, amqpMessage.BodyStream);
+                }
+                else
+                {
+                    ArraySegment<byte> encodedBody = this.encoder.WriteMessage(wcfMessage, int.MaxValue, this.bufferManager);
+                    try
+                    {
+                        amqpMessage.BodyStream.Write(encodedBody.Array, encodedBody.Offset, encodedBody.Count);
+                    }
+                    finally
+                    {
+                        this.bufferManager.ReturnBuffer(encodedBody.Array);
+                    }
+                }
+
+                success = true;
+            }
+            finally
+            {
+                if (!success && (amqpMessage != null))
+                {
+                    amqpMessage.Dispose();
+                }
+            }
+            return amqpMessage;
+        }
+     
+
+        private Message QpidToWcf(AmqpMessage amqpMessage)
+        {
+            if (amqpMessage == null)
+            {
+                return null;
+            }
+
+            Message wcfMessage = null;
+            byte[] managedBuffer = null;
+
+            try
+            {
+                if (this.streamed)
+                {
+                    wcfMessage = this.encoder.ReadMessage(amqpMessage.BodyStream, int.MaxValue);
+                }
+                else
+                {
+                    int count = (int)amqpMessage.BodyStream.Length;
+                    managedBuffer = this.bufferManager.TakeBuffer(count);
+                    int nr = amqpMessage.BodyStream.Read(managedBuffer, 0, count);
+                    ArraySegment<byte> bufseg = new ArraySegment<byte>(managedBuffer, 0, count);
+
+                    wcfMessage = this.encoder.ReadMessage(bufseg, this.bufferManager);
+
+                    // set to null for finally{} block, since the encoder is now responsible for 
+                    // returning the BufferManager memory
+                    managedBuffer = null;
+                }
+
+                // This message will be discarded unless the "To" header matches
+                // the WCF endpoint dispatcher's address filter (or the service is
+                // AddressFilterMode=AddressFilterMode.Any).
+
+                this.remoteAddress.ApplyTo(wcfMessage);
+
+                if (amqpMessage.Properties != null)
+                {
+                    wcfMessage.Properties.Add("AmqpProperties", amqpMessage.Properties);
+                }
+            }
+            catch (XmlException xmlException)
+            {
+                throw new ProtocolException(
+                    "There is a problem with the XML that was received from the network. See inner exception for more details.",
+                    xmlException);
+            }
+            catch (Exception e)
+            {
+                // TODO: logging
+                Console.WriteLine("TX channel encoder exception " + e);
+            }
+            finally
+            {
+                // close the amqpMessage unless the body will be read at a later time.
+                if (!this.streamed || wcfMessage == null)
+                {
+                    amqpMessage.Close();
+                }
+
+                // the handoff to the encoder failed
+                if (managedBuffer != null)
+                {
+                    this.bufferManager.ReturnBuffer(managedBuffer);
+                }
+            }  
+
+            return wcfMessage;
+        }
+
+        private void Cleanup()
+        {
+            this.bufferManager.Clear();
+        }
+
+        // "amqp:queue1" | "amqp:stocks@broker1.com" | "amqp:queue3?routingkey=key"
+        private void ParseAmqpUri(Uri uri)
+        {
+            if (uri.Scheme != AmqpConstants.Scheme)
+            {
+                throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
+                    "The scheme {0} specified in address is not supported.", uri.Scheme), "uri");
+            }
+
+            this.queueName = uri.LocalPath;
+
+            if ((this.queueName.IndexOf('@') != -1) && this.isInputChannel)
+            {
+                throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
+                    "Invalid input queue name: \"{0}\" specified.", this.queueName), "uri");
+            }
+
+            // search out session parameters in the query portion of the URI
+
+            string routingParseKey = "routingkey=";
+            char[] charSeparators = new char[] { '?', ';' };
+            string[] args = uri.Query.Split(charSeparators, StringSplitOptions.RemoveEmptyEntries);
+            foreach (string s in args)
+            {
+                if (s.StartsWith(routingParseKey))
+                {
+                    this.routingKey = s.Substring(routingParseKey.Length);
+                }
+            }
+
+            if (this.queueName == String.Empty)
+            {
+                if (this.isInputChannel)
+                {
+                    throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
+                        "Empty queue target specifier not allowed."), "uri");
+                }
+                else
+                {
+                    if (this.routingKey == null)
+                    {
+                        throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
+                        "No target queue or routing key specified."), "uri");
+                    }
+                }
+            }
+        }
+    }
+}

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj Wed Sep  2 23:38:03 2009
@@ -0,0 +1,102 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <PropertyGroup>
+    <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+    <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+    <ProductVersion>9.0.21022</ProductVersion>
+    <SchemaVersion>2.0</SchemaVersion>
+    <ProjectGuid>{8AABAB30-7D1E-4539-B7D1-05450262BAD2}</ProjectGuid>
+    <OutputType>Library</OutputType>
+    <AppDesignerFolder>Properties</AppDesignerFolder>
+    <RootNamespace>Apache.Qpid.Channel</RootNamespace>
+    <AssemblyName>Apache.Qpid.Channel</AssemblyName>
+    <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+    <FileAlignment>512</FileAlignment>
+    <StartupObject>
+    </StartupObject>
+    <SignAssembly>false</SignAssembly>
+    <AssemblyOriginatorKeyFile>
+    </AssemblyOriginatorKeyFile>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+    <DebugSymbols>true</DebugSymbols>
+    <DebugType>full</DebugType>
+    <Optimize>false</Optimize>
+    <OutputPath>bin\Debug\</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+    <DebugType>pdbonly</DebugType>
+    <Optimize>true</Optimize>
+    <OutputPath>bin\Release\</OutputPath>
+    <DefineConstants>TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+  <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 
+       Other similar extension points exist, see Microsoft.Common.targets.
+  <Target Name="BeforeBuild">
+  </Target>
+  <Target Name="AfterBuild">
+  </Target>
+  -->
+  <ItemGroup>
+    <Compile Include="AmqpBinaryBinding.cs" />
+    <Compile Include="AmqpBinaryBindingCollectionElement.cs" />
+    <Compile Include="AmqpBinaryBindingConfigurationElement.cs" />
+    <Compile Include="AmqpChannelFactory.cs" />
+    <Compile Include="AmqpChannelHelpers.cs" />
+    <Compile Include="AmqpChannelListener.cs" />
+    <Compile Include="AmqpBinding.cs" />
+    <Compile Include="AmqpBindingCollectionElement.cs" />
+    <Compile Include="AmqpBindingConfigurationElement.cs" />
+    <Compile Include="AmqpTransportBindingElement.cs" />
+    <Compile Include="AmqpTransportChannel.cs" />
+    <Compile Include="ConnectionManager.cs" />
+    <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="RawMessage.cs" />
+    <Compile Include="RawMessageEncoder.cs" />
+    <Compile Include="RawMessageEncoderFactory.cs" />
+    <Compile Include="RawMessageEncodingBindingElement.cs" />
+    <Compile Include="RawXmlReader.cs" />
+    <Compile Include="RawXmlWriter.cs" />
+  </ItemGroup>
+  <ItemGroup>
+    <Reference Include="System" />
+    <Reference Include="System.configuration" />
+    <Reference Include="System.Runtime.Serialization">
+      <RequiredTargetFramework>3.0</RequiredTargetFramework>
+    </Reference>
+    <Reference Include="System.ServiceModel">
+      <RequiredTargetFramework>3.0</RequiredTargetFramework>
+    </Reference>
+    <Reference Include="System.XML" />
+  </ItemGroup>
+  <ItemGroup>
+    <ProjectReference Include="..\Interop\Interop.vcproj">
+      <Project>{C9B6AC75-6332-47A4-B82B-0C20E0AF2D34}</Project>
+      <Name>Interop</Name>
+    </ProjectReference>
+  </ItemGroup>
+</Project>
\ No newline at end of file

Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs Wed Sep  2 23:38:03 2009
@@ -0,0 +1,266 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+    using System;
+    using System.Collections;
+    using System.Collections.Generic;
+    using System.Threading;
+
+    using Apache.Qpid.Interop;
+
+    // The ConnectionManager looks after a shareable pool of AmqpConnection and AmqpSession
+    // objects.  If two connection requests could be shared (see MakeKey() properties), and
+    // are designated as shareable, then they will be paired up.  Each shared connection is
+    // a separate instance of a ManagedConnection.  All unshared connections use a single
+    // instance of ManagedConnection with locking turned off.  The ManagedConnection object
+    // registers for notifictation when a connection goes idle (all grandchild InputLink and 
+    // OutputLink objects have been closed), and closes the connection.
+
+    // TODO: the session sharing is roughed-in via comments but needs completing.
+
+    internal sealed class ConnectionManager
+    {
+        // A side effect of creating InputLinks and OutputLinks is that counters 
+        // in the respective AmqpSession and AmqpConnection are updated, so care
+        // must be taken to hold the lock across acquiring a session and opening
+        // a link on it.
+
+        // one for each shared connection
+        private static Dictionary<string, ManagedConnection> sharedInstances;
+
+        // this one creates and releases connections that are not shared.  No locking required.
+        private static ManagedConnection unsharedInstance;
+
+        // lock for finding or creating ManagedConnection instances 
+        private static Object connectionLock;
+
+        static ConnectionManager()
+        {
+            unsharedInstance = null;
+            sharedInstances = new Dictionary<string, ManagedConnection>();
+            connectionLock = new Object();
+        }
+
+        private static string MakeKey(AmqpChannelProperties props)
+        {
+            return props.BrokerHost + ':' + props.BrokerPort + ':' + props.TransferMode;
+        }
+
+        private static ManagedConnection GetManagedConnection(AmqpChannelProperties channelProperties, bool connectionSharing)
+        {
+            if (connectionSharing)
+            {
+                string key = MakeKey(channelProperties);
+                lock (connectionLock)
+                {
+                    ManagedConnection mc = null;
+                    if (!sharedInstances.TryGetValue(key, out mc))
+                    {
+                        mc = new ManagedConnection(true);
+                        sharedInstances.Add(key, mc);
+                    }
+                    return mc;
+                }
+            }
+            else
+            {
+                lock (connectionLock)
+                {
+                    if (unsharedInstance == null)
+                    {
+                        unsharedInstance = new ManagedConnection(false);
+                    }
+                    return unsharedInstance;
+                }
+            }
+        }
+
+        public static OutputLink GetOutputLink(AmqpChannelProperties channelProperties, bool connectionSharing, bool sessionSharing, string qname)
+        {
+            ManagedConnection mc = GetManagedConnection(channelProperties, connectionSharing);
+            return (OutputLink)mc.GetLink(channelProperties, sessionSharing, null, qname);
+        }
+
+        public static InputLink GetInputLink(AmqpChannelProperties channelProperties, bool connectionSharing, bool sessionSharing, string qname)
+        {
+            ManagedConnection mc = GetManagedConnection(channelProperties, connectionSharing);
+            return (InputLink)mc.GetLink(channelProperties, sessionSharing, qname, null);
+        }
+
+
+
+        class ManagedConnection
+        {
+            private Boolean shared;
+            private AmqpConnection sharedConnection;
+            //private Dictionary<string, AmqpSession> sharedSessions;
+
+            public ManagedConnection(bool shared)
+            {
+                this.shared = shared;
+            }
+
+
+            public object GetLink(AmqpChannelProperties channelProperties, bool sessionSharing, string inputQueue, string outputQueue)
+            {
+                AmqpConnection connection = null;
+                AmqpSession session = null;
+                Object link = null;
+                bool newConnection = false;
+                //bool newSession = false;
+                bool success = false;
+
+                // when called in the non-shared case, only stack variables should be used for holding connections/sessions/links
+
+                if (this.shared)
+                {
+                    Monitor.Enter(this); // lock
+                }
+
+                try
+                {
+                    if (this.shared)
+                    {
+                        // TODO: check shared connection not closed (i.e. network drop) and refresh this instance if needed
+                        if (sessionSharing)
+                        {
+                            throw new NotImplementedException("shared session");
+                            /* * ... once we have a defined shared session config parameter:
+
+                            // lazilly create
+                            if (this.sharedSessions == null)
+                            {
+                                this.sharedSessions = new Dictionary<string, AmqpSession>();
+                            }
+
+                            alreadydeclaredstring sessionKey = channelProperties.name_of_key_goes_here;
+                            this.sharedSessions.TryGetValue(sessionKey, out session);
+                       
+                            * */
+                        }
+
+                        if (this.sharedConnection != null)
+                        {
+                            connection = this.sharedConnection;
+                        }
+                    }
+
+                    if (connection == null)
+                    {
+                        connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort);
+                        newConnection = true;
+                        if (this.shared)
+                        {
+                            connection.OnConnectionIdle += new ConnectionIdleEventHandler(this.IdleConnectionHandler);
+                        }
+                        else
+                        {
+                            connection.OnConnectionIdle += new ConnectionIdleEventHandler(UnsharedIdleConnectionHandler);
+                        }
+                    }
+
+                    if (session == null)
+                    {
+                        session = connection.CreateSession();
+                        //newSession = true;
+                    }
+
+                    if (inputQueue != null)
+                    {
+                        link = session.CreateInputLink(inputQueue);
+                    }
+                    else
+                    {
+                        link = session.CreateOutputLink(outputQueue);
+                    }
+
+                    if (this.shared)
+                    {
+                        if (newConnection)
+                        {
+                            this.sharedConnection = connection;
+                        }
+                        /*
+                        if (newSession)
+                        {
+                            sharedSessions.Add(foo, session);
+                        }
+                         * */
+                    }
+
+                    success = true;
+                }
+                finally
+                {
+                    if (this.shared)
+                    {
+                        Monitor.Exit(this);
+                    }
+                    if (!success)
+                    {
+                        /*
+                        if (newSession)
+                        {
+                            session.Close();
+                        }
+                         */
+                        if (newConnection)
+                        {
+                            connection.Close();
+                        }
+                    }
+                }
+
+                return link;
+            }
+
+
+            static void UnsharedIdleConnectionHandler(Object sender, EventArgs empty)
+            {
+                if (sender is AmqpConnection)
+                {
+                    AmqpConnection connection = (AmqpConnection)sender;
+                    connection.Close();
+                }
+            }
+
+            void IdleConnectionHandler(Object sender, EventArgs empty)
+            {
+                lock (this)
+                {
+                    if (sharedConnection != sender || sharedConnection == null)
+                    {
+                        return;
+                    }
+                    if (!sharedConnection.IsIdle)
+                    {
+                        // Another thread made the connection busy again.
+                        // That's OK.  Another idle event will come along later.
+                        return;
+                    }
+                    sharedConnection.Close();  // also closes all child sessions
+                    sharedConnection = null;
+                    //sharedSessions = null;
+                }
+            }
+        }
+    }
+}

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs Wed Sep  2 23:38:03 2009
@@ -0,0 +1,52 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following 
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Apache.Qpid.Channel")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("")]
+[assembly: AssemblyCopyright("")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible 
+// to COM components.  If you need to access a type in this assembly from 
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("ac02bbb0-2c19-43fb-a36c-b1b0a50eaf1a")]
+
+// Version information for an assembly consists of the following four values:
+//
+//      Major Version
+//      Minor Version 
+//      Build Number
+//      Revision
+//
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs Wed Sep  2 23:38:03 2009
@@ -0,0 +1,374 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+    using System;
+    using System.IO;
+    using System.ServiceModel.Channels;
+    using System.Xml;
+
+    // This incoming Message is backed either by a Stream (bodyStream) or a byte array (bodyBytes).
+    // If bodyBytes belongs to a BufferManager, we must return it when done.
+    // The pay-off is OnGetReaderAtBodyContents().
+    // Most of the complexity is dealing with the OnCreateBufferedCopy() machinery.
+    internal class RawMessage : Message
+    {
+        private MessageHeaders headers;
+        private MessageProperties properties;
+        private XmlDictionaryReaderQuotas readerQuotas;
+        private Stream bodyStream;
+        private byte[] bodyBytes;
+        private int index;
+        private int count;
+        private BufferManager bufferManager;
+
+        public RawMessage(byte[] buffer, int index, int count, BufferManager bufferManager, XmlDictionaryReaderQuotas quotas)
+        {
+            // this constructor supports MessageEncoder.ReadMessage(ArraySegment<byte> b, BufferManager mgr, string contentType)
+            if (quotas == null)
+            {
+                quotas = new XmlDictionaryReaderQuotas();
+            }
+
+            this.headers = new MessageHeaders(MessageVersion.None);
+            this.properties = new MessageProperties();
+            this.readerQuotas = quotas;
+            this.bodyBytes = buffer;
+            this.index = index;
+            this.count = count;
+            this.bufferManager = bufferManager;
+        }
+
+        public RawMessage(Stream stream, XmlDictionaryReaderQuotas quotas)
+        {
+            // this constructor supports MessageEncoder.ReadMessage(System.IO.Stream s, int max, string contentType)
+            if (quotas == null)
+            {
+                quotas = new XmlDictionaryReaderQuotas();
+            }
+
+            this.headers = new MessageHeaders(MessageVersion.None);
+            this.properties = new MessageProperties();
+            this.bodyStream = stream;
+        }
+
+        public RawMessage(MessageHeaders headers, MessageProperties properties, byte[] bytes, int index, int count, XmlDictionaryReaderQuotas quotas)
+        {
+            // this constructor supports internal needs for CreateBufferedCopy().CreateMessage()
+            this.headers = new MessageHeaders(headers);
+            this.properties = new MessageProperties(properties);
+            this.bodyBytes = bytes;
+            this.index = index;
+            this.count = count;
+            this.readerQuotas = quotas;
+        }
+
+        public override MessageHeaders Headers
+        {
+            get
+            {
+                if (this.IsDisposed)
+                {
+                    throw new ObjectDisposedException("message");
+                }
+
+                return this.headers;
+            }
+        }
+
+        public override bool IsEmpty
+        {
+            get
+            {
+                if (this.IsDisposed)
+                {
+                    throw new ObjectDisposedException("message");
+                }
+
+                return false;
+            }
+        }
+
+        public override bool IsFault
+        {
+            get
+            {
+                if (this.IsDisposed)
+                {
+                    throw new ObjectDisposedException("message");
+                }
+
+                return false;
+            }
+        }
+
+        public override MessageProperties Properties
+        {
+            get
+            {
+                if (this.IsDisposed)
+                {
+                    throw new ObjectDisposedException("message");
+                }
+
+                return this.properties;
+            }
+        }
+
+        public override MessageVersion Version
+        {
+            get
+            {
+                if (this.IsDisposed)
+                {
+                    throw new ObjectDisposedException("message");
+                }
+
+                return MessageVersion.None;
+            }
+        }
+
+        protected override void OnBodyToString(XmlDictionaryWriter writer)
+        {
+            if (this.bodyStream != null)
+            {
+                writer.WriteString("Stream");
+            }
+            else
+            {
+                writer.WriteStartElement(RawMessageEncoder.StreamElementName, string.Empty);
+                writer.WriteBase64(this.bodyBytes, this.index, this.count);
+                writer.WriteEndElement();
+            }
+        }
+
+        protected override void OnClose()
+        {
+            Exception deferEx = null;
+            try
+            {
+                base.OnClose();
+            }
+            catch (Exception e)
+            {
+                deferEx = e;
+            }
+
+            try
+            {
+                if (this.properties != null)
+                {
+                    this.properties.Dispose();
+                }
+            }
+            catch (Exception e)
+            {
+                if (deferEx == null)
+                {
+                    deferEx = e;
+                }
+            }
+
+            try
+            {
+                if (this.bufferManager != null)
+                {
+                    this.bufferManager.ReturnBuffer(this.bodyBytes);
+                    this.bufferManager = null;
+                }
+            }
+            catch (Exception e)
+            {
+                if (deferEx == null)
+                {
+                    deferEx = e;
+                }
+            }
+
+            if (deferEx != null)
+            {
+                throw deferEx;
+            }
+        }
+
+        protected override MessageBuffer OnCreateBufferedCopy(int maxBufferSize)
+        {
+            if (this.bodyStream != null)
+            {
+                int len = (int)this.bodyStream.Length;
+                byte[] buf = new byte[len];
+                this.bodyStream.Read(buf, 0, len);
+                this.bodyStream = null;
+                this.bodyBytes = buf;
+                this.count = len;
+                this.index = 0;
+            }
+            else
+            {
+                if (this.bufferManager != null)
+                {
+                    // we could take steps to share the buffer among copies and release the memory
+                    // after the last user finishes by a reference count or such, but we are already
+                    // far from the intended optimized use.  Make one GC managed memory copy that is
+                    // shared by all.
+                    byte[] buf = new byte[this.count];
+
+                    Buffer.BlockCopy(this.bodyBytes, this.index, buf, 0, this.count);
+                    this.bufferManager.ReturnBuffer(this.bodyBytes);
+                    this.bufferManager = null;
+                    this.bodyBytes = buf;
+                    this.index = 0;
+                }
+            }
+ 
+            return new RawMessageBuffer(this.headers, this.properties, this.bodyBytes, this.index, this.count, this.readerQuotas);
+        }
+
+        protected override XmlDictionaryReader OnGetReaderAtBodyContents()
+        {
+            Stream readerStream = null;
+            bool ownsStream;
+
+            if (this.bodyStream != null)
+            {
+                readerStream = this.bodyStream;
+                ownsStream = false;
+            }
+            else
+            {
+                // create stream for duration of XmlReader.
+                ownsStream = true;
+                if (this.bufferManager != null)
+                {
+                    readerStream = new RawMemoryStream(this.bodyBytes, this.index, this.count, this.bufferManager);
+                    this.bufferManager = null;
+                }
+                else
+                {
+                    readerStream = new MemoryStream(this.bodyBytes, this.index, this.count, false);
+                }
+            }
+            
+            return new RawXmlReader(readerStream, this.readerQuotas, ownsStream);
+        }
+
+        protected override void OnWriteBodyContents(XmlDictionaryWriter writer)
+        {
+            writer.WriteStartElement(RawMessageEncoder.StreamElementName, string.Empty);
+            if (this.bodyStream != null)
+            {
+                int len = (int)this.bodyStream.Length;
+                byte[] buf = new byte[len];
+                this.bodyStream.Read(buf, 0, len);
+                writer.WriteBase64(buf, 0, len);
+            }
+            else
+            {
+                writer.WriteBase64(this.bodyBytes, this.index, this.count);
+            }
+
+            writer.WriteEndElement();
+        }
+
+        private class RawMemoryStream : MemoryStream
+        {
+            private BufferManager bufferManager;
+            private byte[] buffer;
+
+            public RawMemoryStream(byte[] bytes, int index, int count, BufferManager mgr)
+                : base(bytes, index, count, false)
+            {
+                this.bufferManager = mgr;
+                this.buffer = bytes;
+            }
+
+            protected override void Dispose(bool disposing)
+            {
+                if (this.bufferManager != null)
+                {
+                    try
+                    {
+                        this.bufferManager.ReturnBuffer(this.buffer);
+                    }
+                    finally
+                    {
+                        this.bufferManager = null;
+                        base.Dispose(disposing);
+                    }
+                }
+            }
+        }
+
+        private class RawMessageBuffer : MessageBuffer
+        {
+            private bool closed;
+            private MessageHeaders headers;
+            private MessageProperties properties;
+            private byte[] bodyBytes;
+            private int index;
+            private int count;
+            private XmlDictionaryReaderQuotas readerQuotas;
+
+            public RawMessageBuffer(MessageHeaders headers, MessageProperties properties, byte[] bytes, int index, int count, XmlDictionaryReaderQuotas quotas)
+                : base()
+            {
+                this.headers = new MessageHeaders(headers);
+                this.properties = new MessageProperties(properties);
+                this.bodyBytes = bytes;
+                this.index = index;
+                this.count = count;
+                this.readerQuotas = new XmlDictionaryReaderQuotas();
+                quotas.CopyTo(this.readerQuotas);
+            }
+
+            public override int BufferSize
+            {
+                get { return this.count; }
+            }
+
+            public override void Close()
+            {
+                if (!this.closed)
+                {
+                    this.closed = true;
+                    this.headers = null;
+                    if (this.properties != null)
+                    {
+                        this.properties.Dispose();
+                        this.properties = null;
+                    }
+
+                    this.bodyBytes = null;
+                    this.readerQuotas = null;
+                }
+            }
+
+            public override Message CreateMessage()
+            {
+                if (this.closed)
+                {
+                    throw new ObjectDisposedException("message");
+                }
+
+                return new RawMessage(this.headers, this.properties, this.bodyBytes, this.index, this.count, this.readerQuotas);
+            }
+        }
+    }
+}

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs Wed Sep  2 23:38:03 2009
@@ -0,0 +1,113 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+    using System;
+    using System.IO;
+    using System.ServiceModel.Channels;
+    using System.ServiceModel;
+    using System.Xml;
+
+
+    class RawMessageEncoder : MessageEncoder
+    {
+        public const string StreamElementName = "Binary";
+	
+        XmlDictionaryReaderQuotas readerQuotas;
+        
+        public RawMessageEncoder(XmlDictionaryReaderQuotas quotas)
+        {
+            this.readerQuotas = new XmlDictionaryReaderQuotas();
+            if (quotas != null)
+            {
+                quotas.CopyTo(this.readerQuotas);
+            }
+        }
+
+        public override string ContentType
+        {
+            get { return null; }
+        }
+
+        public override bool IsContentTypeSupported(string contentType)
+        {
+            return true;
+        }
+
+        public override string MediaType
+        {
+            get { return null; }
+        }
+
+        public override MessageVersion MessageVersion
+        {
+            get { return MessageVersion.None; }
+        }
+
+        public override Message ReadMessage(ArraySegment<byte> buffer, BufferManager bufferManager, string contentType)
+        {
+            RawMessage message = new RawMessage(buffer.Array, buffer.Offset, buffer.Count, bufferManager, readerQuotas);
+            message.Properties.Encoder = this;
+            return message;
+        }
+
+        public override Message ReadMessage(Stream stream, int maxSizeOfHeaders, string contentType)
+        {
+            RawMessage message = new RawMessage(stream, readerQuotas);
+            message.Properties.Encoder = this;
+            return message;
+        }
+
+        private void CheckType(XmlDictionaryReader reader, XmlNodeType type)
+        {
+            if (reader.NodeType != type)
+            {
+                throw new System.IO.InvalidDataException(String.Format("RawMessageEncoder xml check {0} type should be {1}", type, reader.NodeType));
+            }
+        }
+
+        public override ArraySegment<byte> WriteMessage(Message message, int maxMessageSize, BufferManager bufferManager, int messageOffset)
+        {
+            MemoryStream tempStream = new MemoryStream();
+            this.WriteMessage(message, tempStream);
+            int len = messageOffset + (int)tempStream.Length;
+            byte[] buf = bufferManager.TakeBuffer(len);
+            MemoryStream targetStream = new MemoryStream(buf);
+            if (messageOffset > 0)
+            {
+                targetStream.Seek(messageOffset, SeekOrigin.Begin);
+            }
+
+            tempStream.WriteTo(targetStream);
+            targetStream.Close();
+
+            return new ArraySegment<byte>(buf, messageOffset, len - messageOffset);
+        }
+
+        public override void WriteMessage(Message message, Stream stream)
+        {
+            using (XmlWriter writer = new RawXmlWriter(stream))
+            {
+                message.WriteMessage(writer);
+                writer.Flush();
+            }
+        }
+    }
+}

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs Wed Sep  2 23:38:03 2009
@@ -0,0 +1,45 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+    using System;
+    using System.Xml;
+    using System.ServiceModel.Channels;
+
+    internal class RawMessageEncoderFactory : MessageEncoderFactory
+    {
+        RawMessageEncoder encoder;
+
+        public RawMessageEncoderFactory(XmlDictionaryReaderQuotas quotas)
+        {
+            this.encoder = new RawMessageEncoder(quotas);
+        }
+
+        public override MessageEncoder Encoder
+        {
+            get { return this.encoder; }
+        }
+
+        public override MessageVersion MessageVersion
+        {
+            get { return encoder.MessageVersion; }
+        }
+    }
+}

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs Wed Sep  2 23:38:03 2009
@@ -0,0 +1,102 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+    using System;
+    using System.ServiceModel.Channels;
+
+    public class RawMessageEncodingBindingElement : MessageEncodingBindingElement
+    {
+
+        public RawMessageEncodingBindingElement()
+            : base()
+        {
+        }
+
+        RawMessageEncodingBindingElement(RawMessageEncodingBindingElement originalBindingElement)
+        {
+        }
+
+        public override MessageEncoderFactory CreateMessageEncoderFactory()
+        {
+            return new RawMessageEncoderFactory(null);
+        }
+
+
+        public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
+        {
+            if (context == null)
+                throw new ArgumentNullException("context");
+
+            context.BindingParameters.Add(this);
+            return context.BuildInnerChannelFactory<TChannel>();
+        }
+
+        public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
+        {
+            if (context == null)
+                throw new ArgumentNullException("context");
+
+            return context.CanBuildInnerChannelFactory<TChannel>();
+        }
+
+        public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
+        {
+            if (context == null)
+                throw new ArgumentNullException("context");
+
+            context.BindingParameters.Add(this);
+            return context.BuildInnerChannelListener<TChannel>();
+        }
+
+        public override bool CanBuildChannelListener<TChannel>(BindingContext context)
+        {
+            if (context == null)
+                throw new ArgumentNullException("context");
+
+            context.BindingParameters.Add(this);
+            return context.CanBuildInnerChannelListener<TChannel>();
+        }
+
+
+        public override BindingElement Clone()
+        {
+            return new RawMessageEncodingBindingElement(this);
+        }
+
+
+
+        public override MessageVersion MessageVersion
+        {
+            get
+            {
+                return MessageVersion.None;
+            }
+
+            set
+            {
+                if (value != MessageVersion.None)
+                    throw new ArgumentException("Unsupported message version");
+            }
+        }
+
+
+    }
+}

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs Wed Sep  2 23:38:03 2009
@@ -0,0 +1,353 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+    using System;
+    using System.IO;
+    using System.Xml;
+
+    internal class RawXmlReader : XmlDictionaryReader
+    {        
+        ////this class presents a hardcoded XML InfoSet: "<rawtag>X</rawtag>" where X is the entire stream content
+
+        private Stream stream;
+        private bool closed;
+        private bool streamOwner;
+        private ReaderPosition position;
+        private string contentAsBase64;
+        private XmlNameTable xmlNameTable;
+        private XmlDictionaryReaderQuotas readerQuotas;
+
+        public RawXmlReader(Stream stream, XmlDictionaryReaderQuotas quotas, bool streamOwner)
+        {
+            this.stream = stream;
+            this.streamOwner = streamOwner;
+            if (quotas == null)
+            {
+                this.readerQuotas = new XmlDictionaryReaderQuotas();
+            }
+            else
+            {
+                this.readerQuotas = quotas;
+            }
+        }
+
+        private enum ReaderPosition
+        {
+            None,
+            StartElement,
+            Content,
+            EndElement,
+            EOF
+        }
+
+        public override int AttributeCount
+        {
+            get { return 0; }
+        }
+
+        public override string BaseURI
+        {
+            get { return string.Empty; }
+        }
+
+        public override int Depth
+        {
+            get { return (this.position == ReaderPosition.Content) ? 1 : 0; }
+        }
+
+        public override bool EOF
+        {
+            get { return this.position == ReaderPosition.EOF; }
+        }
+
+        public override bool HasAttributes
+        {
+            get { return false; }
+        }
+
+        public override bool HasValue
+        {
+            get { return this.position == ReaderPosition.Content; }
+        }
+
+        public override bool IsEmptyElement
+        {
+            get { return false; }
+        }
+
+        public override string LocalName
+        {
+            get 
+            {
+                if (this.position == ReaderPosition.StartElement)
+                {
+                    return RawMessageEncoder.StreamElementName;
+                }
+
+                return null;
+            }
+        }
+
+        public override string NamespaceURI
+        {
+            get { return string.Empty; }
+        }
+
+        public override XmlNameTable NameTable
+        {
+            get
+            {
+                if (this.xmlNameTable == null)
+                {
+                    this.xmlNameTable = new NameTable();
+                    this.xmlNameTable.Add(RawMessageEncoder.StreamElementName);
+                }
+
+                return this.xmlNameTable;
+            }
+        }
+
+        public override XmlNodeType NodeType
+        {
+            get
+            {
+                switch (this.position)
+                {
+                    case ReaderPosition.StartElement:
+                        return XmlNodeType.Element;
+                    case ReaderPosition.Content:
+                        return XmlNodeType.Text;
+                    case ReaderPosition.EndElement:
+                        return XmlNodeType.EndElement;
+                    default:
+                        // and StreamPosition.EOF
+                        return XmlNodeType.None;
+                }
+            }
+        }
+
+        public override string Prefix
+        {
+            get { return string.Empty; }
+        }
+
+        public override ReadState ReadState
+        {
+            get
+            {
+                switch (this.position)
+                {
+                    case ReaderPosition.None:
+                        return ReadState.Initial;
+                    case ReaderPosition.StartElement:
+                    case ReaderPosition.Content:
+                    case ReaderPosition.EndElement:
+                        return ReadState.Interactive;
+                    case ReaderPosition.EOF:
+                        return ReadState.Closed;
+                    default:
+                        return ReadState.Error;
+                }
+            }
+        }
+
+        public override string Value
+        {
+            get
+            {
+                switch (this.position)
+                {
+                    case ReaderPosition.Content:
+                        if (this.contentAsBase64 == null)
+                        {
+                            this.contentAsBase64 = Convert.ToBase64String(this.ReadContentAsBase64());
+                        }
+
+                        return this.contentAsBase64;
+
+                    default:
+                        return string.Empty;
+                }
+            }
+        }
+
+        public override void Close()
+        {
+            if (!this.closed)
+            {
+                this.closed = true;
+                this.position = ReaderPosition.EOF;
+                this.readerQuotas = null;
+                if (this.streamOwner)
+                {
+                    this.stream.Close();
+                }
+            }
+        }
+
+        public override string GetAttribute(int i)
+        {
+            throw new ArgumentOutOfRangeException("i", i, "Argument not in set of valid values");
+        }
+
+        public override string GetAttribute(string name, string namespaceURI)
+        {
+            return null;
+        }
+
+        public override string GetAttribute(string name)
+        {
+            return null;
+        }
+
+        public override string LookupNamespace(string prefix)
+        {
+            if (prefix == string.Empty)
+            {
+                return string.Empty;
+            }
+            else if (prefix == "xml")
+            {
+                return "http://www.w3.org/XML/1998/namespace";
+            }
+            else if (prefix == "xmlns")
+            {
+                return "http://www.w3.org/2000/xmlns/";
+            }
+            else
+            {
+                return null;
+            }
+        }
+
+        public override bool MoveToAttribute(string name, string ns)
+        {
+            return false;
+        }
+
+        public override bool MoveToAttribute(string name)
+        {
+            return false;
+        }
+
+        public override bool MoveToElement()
+        {
+            if (this.position == ReaderPosition.None)
+            {
+                this.position = ReaderPosition.StartElement;
+                return true;
+            }
+
+            return false;
+        }
+
+        public override bool MoveToFirstAttribute()
+        {
+            return false;
+        }
+
+        public override bool MoveToNextAttribute()
+        {
+            return false;
+        }
+
+        public override bool Read()
+        {
+            switch (this.position)
+            {
+                case ReaderPosition.None:
+                    this.position = ReaderPosition.StartElement;
+                    return true;
+                case ReaderPosition.StartElement:
+                    this.position = ReaderPosition.Content;
+                    return true;
+                case ReaderPosition.Content:
+                    this.position = ReaderPosition.EndElement;
+                    return true;
+                case ReaderPosition.EndElement:
+                    this.position = ReaderPosition.EOF;
+                    return false;
+                case ReaderPosition.EOF:
+                    return false;
+                default:
+                    return false;
+            }
+        }
+
+        public override bool ReadAttributeValue()
+        {
+            return false;
+        }
+
+        public override int ReadContentAsBase64(byte[] buffer, int index, int count)
+        {
+            if (buffer == null)
+            {
+                throw new ArgumentNullException("buffer");
+            }
+
+            if (this.position != ReaderPosition.Content)
+            {
+                throw new InvalidOperationException("XML reader not in Element");
+            }
+
+            if (count == 0)
+            {
+                return 0;
+            }
+
+            int readCount = this.stream.Read(buffer, index, count);
+            if (readCount == 0)
+            {
+                this.position = ReaderPosition.EndElement;
+            }
+
+            return readCount;
+        }
+
+        public override int ReadContentAsBinHex(byte[] buffer, int index, int count)
+        {
+            throw new NotSupportedException();
+        }
+
+        public override void ResolveEntity()
+        {
+            throw new NotSupportedException();
+        }
+
+        public override bool TryGetBase64ContentLength(out int length)
+        {
+            // The whole stream is this one element
+            if (!this.closed && this.stream.CanSeek)
+            {
+                long streamLength = this.stream.Length;
+                if (streamLength <= int.MaxValue)
+                {
+                    length = (int)streamLength;
+                    return true;
+                }
+            }
+
+            length = -1;
+            return false;
+        }
+    }
+}

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs Wed Sep  2 23:38:03 2009
@@ -0,0 +1,221 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+    using System;
+    using System.IO;
+    using System.Xml;
+
+    internal sealed class RawXmlWriter : XmlDictionaryWriter
+    {
+
+        WriteState state;
+        Stream stream;
+        bool closed;
+        bool rawWritingEnabled;
+
+        public RawXmlWriter(Stream stream)
+        {
+            if (stream == null)
+            {
+                throw new ArgumentNullException("Stream");
+            }
+
+            this.stream = stream;
+            this.state = WriteState.Start;
+        }
+
+        public override WriteState WriteState
+        {
+            get
+            {
+                return this.state;
+            }
+        }
+
+        public override void Close()
+        {
+            if (!this.closed)
+            {
+                this.closed = true;
+                this.state = WriteState.Closed;
+                this.rawWritingEnabled = false;
+            }
+        }
+
+        public override void Flush()
+        {
+            this.ThrowIfClosed();
+            this.stream.Flush();
+        }
+
+        public override string LookupPrefix(string ns)
+        {
+            return null;
+        }
+
+        public override void WriteBase64(byte[] buffer, int index, int count)
+        {
+            if (buffer == null)
+            {
+                throw new ArgumentNullException("buffer");
+            }
+
+            ThrowIfClosed();
+
+            if (!this.rawWritingEnabled)
+            {
+                throw new InvalidOperationException("XmlWriter not in Element");
+            }
+
+            this.stream.Write(buffer, index, count);
+            this.state = WriteState.Content;
+        }
+
+        public override void WriteStartElement(string prefix, string localName, string ns)
+        {
+            ThrowIfClosed();
+            if (this.state != WriteState.Start)
+            {
+                throw new InvalidOperationException("Start Element Already Called");
+            }
+
+            if (!string.IsNullOrEmpty(prefix) || !string.IsNullOrEmpty(ns) || localName != RawMessageEncoder.StreamElementName)
+            {
+                throw new XmlException("Wrong XML Start Element Name");
+            }
+            this.state = WriteState.Element;
+            this.rawWritingEnabled = true;
+        }
+
+        public override void WriteEndElement()
+        {
+            ThrowIfClosed();
+            if (!this.rawWritingEnabled)
+            {
+                throw new InvalidOperationException("Unexpected End Element");
+            }
+            this.rawWritingEnabled = false;
+        }
+
+        public override void WriteFullEndElement()
+        {
+            this.WriteEndElement();
+        }
+
+        public override void WriteEndDocument()
+        {
+            this.rawWritingEnabled = false;
+            this.ThrowIfClosed();
+        }
+
+        public override void WriteStartDocument()
+        {
+            this.rawWritingEnabled = false;
+            this.ThrowIfClosed();
+        }
+
+        public override void WriteStartDocument(bool standalone)
+        {
+            this.rawWritingEnabled = false;
+            this.ThrowIfClosed();
+        }
+
+        private void ThrowIfClosed()
+        {
+            if (this.closed)
+            {
+                throw new InvalidOperationException("XML Writer closed");
+            }
+        }
+
+
+        public override void WriteString(string text)
+        {
+            throw new NotSupportedException();
+        }
+
+        public override void WriteCData(string text)
+        {
+            throw new NotSupportedException();
+        }
+
+        public override void WriteCharEntity(char ch)
+        {
+            throw new NotSupportedException();
+        }
+
+        public override void WriteChars(char[] buffer, int index, int count)
+        {
+            throw new NotSupportedException();
+        }
+
+        public override void WriteComment(string text)
+        {
+            throw new NotSupportedException();
+        }
+
+        public override void WriteDocType(string name, string pubid, string sysid, string subset)
+        {
+            throw new NotSupportedException();
+        }
+
+        public override void WriteEndAttribute()
+        {
+            throw new NotSupportedException();
+        }
+
+        public override void WriteEntityRef(string name)
+        {
+            throw new NotSupportedException();
+        }
+
+
+        public override void WriteProcessingInstruction(string name, string text)
+        {
+            throw new NotSupportedException();
+        }
+
+        public override void WriteRaw(string data)
+        {
+            throw new NotSupportedException();
+        }
+
+        public override void WriteRaw(char[] buffer, int index, int count)
+        {
+            throw new NotSupportedException();
+        }
+
+        public override void WriteStartAttribute(string prefix, string localName, string ns)
+        {
+            throw new NotSupportedException();
+        }
+
+        public override void WriteSurrogateCharEntity(char lowChar, char highChar)
+        {
+            throw new NotSupportedException();
+        }
+
+        public override void WriteWhitespace(string ws)
+        {
+            throw new NotSupportedException();
+        }
+    }
+}

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp Wed Sep  2 23:38:03 2009
@@ -0,0 +1,165 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/framing/FrameSet.h"
+
+#include "AmqpConnection.h"
+#include "AmqpSession.h"
+#include "QpidMarshal.h"
+#include "QpidException.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace std;
+
+
+// Note on locks: Use "this" for fast counting and idle/busy
+// notifications.  Use the "sessions" list to serialize session
+// creation/reaping and overall tear down.
+// TODO: switch "this" lock to separate non-visible Object.
+
+
+AmqpConnection::AmqpConnection(String^ server, int port) :
+    connectionp(NULL),
+    busyCount(0),
+    disposed(false)
+{
+    bool success = false;
+    System::Exception^ openException = nullptr;
+    sessions = gcnew Collections::Generic::List<AmqpSession^>();
+
+    try {
+        connectionp = new Connection;
+	connectionp->open (QpidMarshal::ToNative(server), port);
+	// TODO: registerFailureCallback for failover
+	success = true;
+	const ConnectionSettings& settings = connectionp->getNegotiatedSettings();
+	this->maxFrameSize = settings.maxFrameSize;
+    } catch (const qpid::Exception& error) {
+        String^ errmsg = gcnew String(error.what());
+	openException = gcnew QpidException(errmsg);
+    } finally {
+        if (!success) {
+ 	    Cleanup();
+	    if (openException == nullptr) {
+	        openException = gcnew QpidException ("unknown connection failure");
+	    }
+	    throw openException;
+	}
+    }
+}
+
+void AmqpConnection::Cleanup()
+{
+    {
+        lock l(sessions);
+	if (disposed)
+	    return;
+	disposed = true;
+    }
+
+    try {
+        // let the child sessions clean up
+        for each(AmqpSession^ s in sessions) {
+	    s->ConnectionClosed();
+	}
+    }
+    finally
+    {
+	if (connectionp != NULL) {
+	    connectionp->close();
+	    delete connectionp;
+	    connectionp = NULL;
+	}
+    }
+}
+
+AmqpConnection::~AmqpConnection()
+{
+    Cleanup();
+}
+
+AmqpConnection::!AmqpConnection()
+{
+    Cleanup();
+}
+
+void AmqpConnection::Close()
+{
+    // Simulate Dispose()...
+    Cleanup();
+    GC::SuppressFinalize(this);
+}
+
+AmqpSession^ AmqpConnection::CreateSession()
+{
+    lock l(sessions);
+    if (disposed) {
+	throw gcnew ObjectDisposedException("AmqpConnection");
+    }
+    AmqpSession^ session = gcnew AmqpSession(this, connectionp);
+    sessions->Add(session);
+    return session;
+}
+
+// called whenever a child session becomes newly busy (a first reader or writer since last idle)
+
+void AmqpConnection::NotifyBusy()
+{
+    bool changed = false;
+    {
+        lock l(this);
+	if (busyCount++ == 0)
+	    changed = true;
+    }
+}
+
+// called whenever a child session becomes newly idle (a last reader or writer has closed)
+// The connection is idle when none of its child sessions are busy
+
+void AmqpConnection::NotifyIdle()
+{
+    bool connectionIdle = false;
+    {
+        lock l(this);
+	if (--busyCount == 0)
+	    connectionIdle = true;
+    }
+    if (connectionIdle) {
+        OnConnectionIdle(this, System::EventArgs::Empty);
+    }
+}
+
+
+}}} // namespace Apache::Qpid::Interop

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h?rev=810734&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h Wed Sep  2 23:38:03 2009
@@ -0,0 +1,71 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace std;
+using namespace qpid::client;
+
+ref class AmqpSession;
+
+public delegate void ConnectionIdleEventHandler(Object^ sender, EventArgs^ eventArgs);
+
+public ref class AmqpConnection
+{
+private:
+    Connection* connectionp;
+    void Cleanup();
+    bool disposed;
+    Collections::Generic::List<AmqpSession^>^ sessions;
+    bool isOpen;
+    int busyCount;
+    int maxFrameSize;
+
+ internal:
+    void NotifyBusy();
+    void NotifyIdle();
+
+    property int MaxFrameSize {
+	int get () { return maxFrameSize; }
+    }
+
+public:  
+    AmqpConnection(System::String^ server, int port);
+    ~AmqpConnection();
+    !AmqpConnection();
+    void Close();
+    AmqpSession^ CreateSession();
+    event ConnectionIdleEventHandler^ OnConnectionIdle;
+
+    property bool IsOpen {
+	bool get() { return isOpen; }
+    };
+
+    property bool IsIdle {
+	bool get() { return (busyCount == 0); }
+    }
+};
+
+
+}}} // namespace Apache::Qpid::Interop



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org