You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2010/10/25 01:29:38 UTC

svn commit: r1026915 - in /qpid/trunk/qpid/wcf: samples/Integration/ samples/Integration/Drain/ samples/Integration/Spout/ samples/Integration/Util/ src/Apache/Qpid/AmqpTypes/ src/Apache/Qpid/Channel/ src/Apache/Qpid/Interop/

Author: cliffjansen
Date: Sun Oct 24 23:29:37 2010
New Revision: 1026915

URL: http://svn.apache.org/viewvc?rev=1026915&view=rev
Log:
QPID-2646 patches

Added:
    qpid/trunk/qpid/wcf/samples/Integration/
    qpid/trunk/qpid/wcf/samples/Integration/Drain/
    qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.cs
    qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.csproj
    qpid/trunk/qpid/wcf/samples/Integration/Integration.sln
    qpid/trunk/qpid/wcf/samples/Integration/Spout/
    qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.cs
    qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.csproj
    qpid/trunk/qpid/wcf/samples/Integration/Util/
    qpid/trunk/qpid/wcf/samples/Integration/Util/Options.cs
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h
Modified:
    qpid/trunk/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h

Added: qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.cs?rev=1026915&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.cs (added)
+++ qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.cs Sun Oct 24 23:29:37 2010
@@ -0,0 +1,146 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Samples.Integration
+{
+    using System;
+    using System.Collections;
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using System.IO;
+    using System.ServiceModel;
+    using System.ServiceModel.Channels;
+    using System.ServiceModel.Description;
+    using System.Text;
+    using System.Xml;
+    using Apache.Qpid.Channel;
+    using Apache.Qpid.AmqpTypes;
+
+    class Drain
+    {
+        // delimit multiple values
+        private static void Append(StringBuilder sb, string s)
+        {
+            if (sb.Length > 0)
+            {
+                sb.Append(", ");
+            }
+
+            sb.Append(s);
+        }
+
+        private static string MessagePropertiesAsString(AmqpProperties props)
+        {
+            StringBuilder sb = new StringBuilder();
+
+            if (props.PropertyMap != null)
+            {
+                foreach (KeyValuePair<string, AmqpType> kvp in props.PropertyMap)
+                {
+                    string propval;
+                    if (kvp.Value is AmqpString)
+                    {
+                        AmqpString amqps = (AmqpString)kvp.Value;
+                        propval = amqps.Value;
+                    }
+                    else
+                    {
+                        propval = kvp.Value.ToString();
+                    }
+
+                    Append(sb, kvp.Key + ":" + propval);
+                }
+            }
+
+            return sb.ToString();
+        }
+
+        private static string MessageContentAsString(Message msg, AmqpProperties props)
+        {
+            // AmqpBinaryBinding provides message content as a single XML "Binary" element
+            XmlDictionaryReader reader = msg.GetReaderAtBodyContents();
+            while (!reader.HasValue)
+            {
+                reader.Read();
+                if (reader.EOF)
+                {
+                    throw new InvalidDataException("empty reader for message");
+                }
+            }
+
+            byte[] rawdata = reader.ReadContentAsBase64();
+
+            string ct = props.ContentType;
+            if (ct != null)
+            {
+                if (ct.Equals("amqp/map"))
+                {
+                    return "mapdata (coming soon)";
+                }
+            }
+
+            return Encoding.UTF8.GetString(rawdata);
+        }
+
+        static void Main(string[] args)
+        {
+            try
+            {
+                Options options = new Options(args);
+
+                AmqpBinaryBinding binding = new AmqpBinaryBinding();
+                binding.BrokerHost = options.Broker;
+                binding.BrokerPort = options.Port;
+                binding.TransferMode = TransferMode.Streamed;
+
+                IChannelFactory<IInputChannel> factory = binding.BuildChannelFactory<IInputChannel>();
+
+                factory.Open();
+                try
+                {
+                    System.ServiceModel.EndpointAddress addr = options.Address;
+                    IInputChannel receiver = factory.CreateChannel(addr);
+                    receiver.Open();
+
+                    TimeSpan timeout = options.Timeout;
+                    System.ServiceModel.Channels.Message message;
+
+                    while (receiver.TryReceive(timeout, out message))
+                    {
+                        AmqpProperties props = (AmqpProperties)message.Properties["AmqpProperties"];
+
+                        Console.WriteLine("Message(properties=" + 
+                            MessagePropertiesAsString(props) + 
+                            ", content='" + 
+                            MessageContentAsString(message, props) +
+                            "')");
+                    }
+                }
+                finally
+                {
+                    factory.Close();
+                }
+            }
+            catch (Exception e)
+            {
+                Console.WriteLine("Drain: " + e);
+            }
+        }
+    }
+}

Added: qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.csproj
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.csproj?rev=1026915&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.csproj (added)
+++ qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.csproj Sun Oct 24 23:29:37 2010
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <PropertyGroup>
+    <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+    <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+    <ProductVersion>9.0.21022</ProductVersion>
+    <SchemaVersion>2.0</SchemaVersion>
+    <ProjectGuid>{A67D9B60-34A5-462F-84A2-72C22F623749}</ProjectGuid>
+    <OutputType>Exe</OutputType>
+    <RootNamespace>Drain</RootNamespace>
+    <AssemblyName>Drain</AssemblyName>
+    <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+    <FileAlignment>512</FileAlignment>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+    <DebugSymbols>true</DebugSymbols>
+    <DebugType>full</DebugType>
+    <Optimize>false</Optimize>
+    <OutputPath>bin\Debug\</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+    <DebugType>pdbonly</DebugType>
+    <Optimize>true</Optimize>
+    <OutputPath>bin\Release\</OutputPath>
+    <DefineConstants>TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <ItemGroup>
+    <Reference Include="Apache.Qpid.Channel, Version=1.0.0.0, Culture=neutral, PublicKeyToken=679e1f50b62dbace, processorArchitecture=MSIL">
+      <SpecificVersion>False</SpecificVersion>
+      <HintPath>..\..\..\src\Apache\Qpid\Channel\bin\Release\Apache.Qpid.Channel.dll</HintPath>
+    </Reference>
+    <Reference Include="Apache.Qpid.Interop, Version=1.0.3796.12140, Culture=neutral, PublicKeyToken=679e1f50b62dbace, processorArchitecture=AMD64">
+      <SpecificVersion>False</SpecificVersion>
+      <HintPath>..\..\..\src\Apache\Qpid\Channel\bin\Release\Apache.Qpid.Interop.dll</HintPath>
+    </Reference>
+    <Reference Include="System" />
+    <Reference Include="System.Runtime.Serialization">
+      <RequiredTargetFramework>3.0</RequiredTargetFramework>
+    </Reference>
+    <Reference Include="System.ServiceModel">
+      <RequiredTargetFramework>3.0</RequiredTargetFramework>
+    </Reference>
+    <Reference Include="System.Xml" />
+  </ItemGroup>
+  <ItemGroup>
+    <Compile Include="Drain.cs" />
+    <Compile Include="..\Util\Options.cs" />
+  </ItemGroup>
+  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+  <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 
+       Other similar extension points exist, see Microsoft.Common.targets.
+  <Target Name="BeforeBuild">
+  </Target>
+  <Target Name="AfterBuild">
+  </Target>
+  -->
+</Project>

Added: qpid/trunk/qpid/wcf/samples/Integration/Integration.sln
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/samples/Integration/Integration.sln?rev=1026915&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/samples/Integration/Integration.sln (added)
+++ qpid/trunk/qpid/wcf/samples/Integration/Integration.sln Sun Oct 24 23:29:37 2010
@@ -0,0 +1,46 @@
+
+Microsoft Visual Studio Solution File, Format Version 10.00
+
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+# Visual Studio 2008
+
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Drain", "Drain\Drain.csproj", "{A67D9B60-34A5-462F-84A2-72C22F623749}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Spout", "Spout\Spout.csproj", "{347A531B-38DB-4848-9E4D-4E5E7F9C97E7}"
+EndProject
+Global
+	GlobalSection(SolutionConfigurationPlatforms) = preSolution
+		Debug|Any CPU = Debug|Any CPU
+		Release|Any CPU = Release|Any CPU
+	EndGlobalSection
+	GlobalSection(ProjectConfigurationPlatforms) = postSolution
+		{A67D9B60-34A5-462F-84A2-72C22F623749}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{A67D9B60-34A5-462F-84A2-72C22F623749}.Debug|Any CPU.Build.0 = Debug|Any CPU
+		{A67D9B60-34A5-462F-84A2-72C22F623749}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{A67D9B60-34A5-462F-84A2-72C22F623749}.Release|Any CPU.Build.0 = Release|Any CPU
+		{347A531B-38DB-4848-9E4D-4E5E7F9C97E7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{347A531B-38DB-4848-9E4D-4E5E7F9C97E7}.Debug|Any CPU.Build.0 = Debug|Any CPU
+		{347A531B-38DB-4848-9E4D-4E5E7F9C97E7}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{347A531B-38DB-4848-9E4D-4E5E7F9C97E7}.Release|Any CPU.Build.0 = Release|Any CPU
+	EndGlobalSection
+	GlobalSection(SolutionProperties) = preSolution
+		HideSolutionNode = FALSE
+	EndGlobalSection
+EndGlobal

Added: qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.cs?rev=1026915&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.cs (added)
+++ qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.cs Sun Oct 24 23:29:37 2010
@@ -0,0 +1,109 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Samples.Integration
+{
+    using System;
+    using System.Collections;
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using System.IO;
+    using System.ServiceModel;
+    using System.ServiceModel.Channels;
+    using System.ServiceModel.Description;
+    using System.Text;
+    using System.Xml;
+    using Apache.Qpid.Channel;
+    using Apache.Qpid.AmqpTypes;
+
+    class Spout
+    {
+         static void Main(string[] args)
+        {
+            try
+            {
+                Options options = new Options(args);
+
+                AmqpBinaryBinding binding = new AmqpBinaryBinding();
+                binding.BrokerHost = options.Broker;
+                binding.BrokerPort = options.Port;
+                binding.TransferMode = TransferMode.Streamed;
+
+                IChannelFactory<IOutputChannel> factory = binding.BuildChannelFactory<IOutputChannel>();
+
+                factory.Open();
+                try
+                {
+                    System.ServiceModel.EndpointAddress addr = options.Address;
+                    IOutputChannel sender = factory.CreateChannel(addr);
+                    sender.Open();
+
+                    MyRawBodyWriter.Initialize(options.Content);
+                    DateTime end = DateTime.Now.Add(options.Timeout);
+                    System.ServiceModel.Channels.Message message;
+
+                    for (int count = 0; ((count < options.Count) || (options.Count == 0)) &&
+                        ((options.Timeout == TimeSpan.Zero) || (end.CompareTo(DateTime.Now) > 0)); count++)
+                    {
+                        message = Message.CreateMessage(MessageVersion.None, "", new MyRawBodyWriter());
+                        AmqpProperties props = new AmqpProperties();
+                        props.ContentType = "text/plain";
+
+                        string id = Guid.NewGuid().ToString() + ":" + count;
+                        props.PropertyMap.Add("spout-id", new AmqpString(id));
+
+                        message.Properties["AmqpProperties"] = props;
+                        sender.Send(message);
+                    }
+                }
+                finally
+                {
+                    factory.Close();
+                }
+            }
+            catch (Exception e)
+            {
+                Console.WriteLine("Spout: " + e);
+            }
+        }
+
+
+        public class MyRawBodyWriter : BodyWriter
+        {
+            static byte[] body;
+
+            public MyRawBodyWriter()
+                : base(false)
+            {
+            }
+
+            public static void Initialize(String content)
+            {
+                body = Encoding.UTF8.GetBytes(content);
+            }
+
+            // invoked by the binary encoder when the message is written
+            protected override void OnWriteBodyContents(XmlDictionaryWriter writer)
+            {
+                writer.WriteStartElement("Binary");
+                writer.WriteBase64(body, 0, body.Length);
+            }
+        }
+    }
+}

Added: qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.csproj
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.csproj?rev=1026915&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.csproj (added)
+++ qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.csproj Sun Oct 24 23:29:37 2010
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <PropertyGroup>
+    <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+    <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+    <ProductVersion>9.0.21022</ProductVersion>
+    <SchemaVersion>2.0</SchemaVersion>
+    <ProjectGuid>{347A531B-38DB-4848-9E4D-4E5E7F9C97E7}</ProjectGuid>
+    <OutputType>Exe</OutputType>
+    <AppDesignerFolder>Properties</AppDesignerFolder>
+    <RootNamespace>Spout</RootNamespace>
+    <AssemblyName>Spout</AssemblyName>
+    <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+    <FileAlignment>512</FileAlignment>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+    <DebugSymbols>true</DebugSymbols>
+    <DebugType>full</DebugType>
+    <Optimize>false</Optimize>
+    <OutputPath>bin\Debug\</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+    <DebugType>pdbonly</DebugType>
+    <Optimize>true</Optimize>
+    <OutputPath>bin\Release\</OutputPath>
+    <DefineConstants>TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <ItemGroup>
+    <Reference Include="Apache.Qpid.Channel, Version=1.0.0.0, Culture=neutral, PublicKeyToken=679e1f50b62dbace, processorArchitecture=MSIL">
+      <SpecificVersion>False</SpecificVersion>
+      <HintPath>..\..\..\src\Apache\Qpid\Channel\bin\Release\Apache.Qpid.Channel.dll</HintPath>
+    </Reference>
+    <Reference Include="Apache.Qpid.Interop, Version=1.0.3796.12140, Culture=neutral, PublicKeyToken=679e1f50b62dbace, processorArchitecture=AMD64">
+      <SpecificVersion>False</SpecificVersion>
+      <HintPath>..\..\..\src\Apache\Qpid\Channel\bin\Release\Apache.Qpid.Interop.dll</HintPath>
+    </Reference>
+    <Reference Include="System" />
+    <Reference Include="System.Runtime.Serialization">
+      <RequiredTargetFramework>3.0</RequiredTargetFramework>
+    </Reference>
+    <Reference Include="System.ServiceModel">
+      <RequiredTargetFramework>3.0</RequiredTargetFramework>
+    </Reference>
+    <Reference Include="System.Xml" />
+  </ItemGroup>
+  <ItemGroup>
+    <Compile Include="..\Util\Options.cs" />
+    <Compile Include="Spout.cs" />
+  </ItemGroup>
+  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+  <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 
+       Other similar extension points exist, see Microsoft.Common.targets.
+  <Target Name="BeforeBuild">
+  </Target>
+  <Target Name="AfterBuild">
+  </Target>
+  -->
+</Project>

Added: qpid/trunk/qpid/wcf/samples/Integration/Util/Options.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/samples/Integration/Util/Options.cs?rev=1026915&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/samples/Integration/Util/Options.cs (added)
+++ qpid/trunk/qpid/wcf/samples/Integration/Util/Options.cs Sun Oct 24 23:29:37 2010
@@ -0,0 +1,157 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Samples.Integration
+{
+    using System;
+    using System.Collections;
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using System.IO;
+    using System.ServiceModel;
+    using System.ServiceModel.Channels;
+    using System.ServiceModel.Description;
+    using System.Text;
+    using System.Xml;
+
+    public class Options
+    {
+        private string broker;
+        private int port;
+        private int messageCount;
+        private EndpointAddress address;
+        private TimeSpan timeout;
+        private string content;
+
+        public Options(string[] args)
+        {
+            this.broker = "127.0.0.1";
+            this.port = 5672;
+            this.messageCount = 1;
+            this.timeout = TimeSpan.FromSeconds(0);
+            Parse(args);
+        }
+
+        private void Parse(string[] args)
+        {
+            int argCount = args.Length;
+            int current = 0;
+            bool typeSelected = false;
+
+            while ((current + 1) < argCount)
+            {
+                string arg = args[current];
+                if (arg == "--count")
+                {
+                    arg = args[++current];
+                    int i = Int32.Parse(arg);
+                    if (i >= 0)
+                    {
+                        this.messageCount = i;
+                    }
+                }
+                else if (arg == "--broker")
+                {
+                    this.broker = args[++current];
+                }
+                else if (arg == "--port")
+                {
+                    arg = args[++current];
+                    int i = int.Parse(arg);
+                    if (i > 0)
+                    {
+                        this.port = i;
+                    }
+                }
+                else if (arg == "--timeout")
+                {
+                    arg = args[++current];
+                    int i = int.Parse(arg);
+                    if (i > 0)
+                    {
+                        this.timeout = TimeSpan.FromSeconds(i);
+                    }
+                }
+
+                else if (arg == "--content")
+                {
+                    this.content = args[++current];
+                }
+
+                else
+                {
+                    throw new ArgumentException(String.Format("unknown argument \"{0}\"", arg));
+                }
+
+                current++;
+            }
+
+            if (current == argCount)
+            {
+                throw new ArgumentException("missing argument: address");
+            }
+
+            address = new EndpointAddress("amqp:" + args[current]);
+
+            if (timeout < TimeSpan.FromMilliseconds(100))
+            {
+                // WCF timeout of 0 really means no time for even a single message transfer
+                timeout = TimeSpan.FromMilliseconds(100);
+            }
+        }
+
+        public EndpointAddress Address
+        {
+            get { return this.address; }
+        }
+
+        public string Broker
+        {
+            get { return this.broker; }
+        }
+
+        public string Content
+        {
+            get
+            {
+                if (content == null)
+                {
+                    return String.Empty;
+                }
+                return content;
+            }
+        }
+
+
+        public int Count
+        {
+            get { return this.messageCount; }
+        }
+
+        public int Port
+        {
+            get { return this.port; }
+        }
+
+        public TimeSpan Timeout
+        {
+            get { return this.timeout; }
+        }
+    }
+}

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs Sun Oct 24 23:29:37 2010
@@ -29,7 +29,7 @@ namespace Apache.Qpid.AmqpTypes
         // AMQP 0-10 delivery properties
         private bool durable;
         private Nullable<TimeSpan> timeToLive;
-        private string routingKey;
+        private string subject;
 
         // AMQP 0-10 message properties
         private string replyToExchange;
@@ -50,7 +50,7 @@ namespace Apache.Qpid.AmqpTypes
         {
             get
             {
-                return ((this.routingKey != null) || this.durable || this.timeToLive.HasValue);
+                return ((this.subject != null) || this.durable || this.timeToLive.HasValue);
             }
         }
 
@@ -163,10 +163,19 @@ namespace Apache.Qpid.AmqpTypes
             set { this.timeToLive = value; }
         }
 
+        /// <summary>
+        /// Obsolete: switch to AMQP 1.0 "Subject" naming
+        /// </summary>
         public string RoutingKey
         {
-            get { return this.routingKey; }
-            set { this.routingKey = value; }
+            get { return this.subject; }
+            set { this.subject = value; }
+        }
+
+        public string Subject
+        {
+            get { return this.subject; }
+            set { this.subject = value; }
         }
 
         public string ReplyToExchange
@@ -200,7 +209,7 @@ namespace Apache.Qpid.AmqpTypes
         public void Clear()
         {
             this.timeToLive = null;
-            this.routingKey = null;
+            this.subject = null;
             this.replyToRoutingKey = null;
             this.replyToExchange = null;
             this.durable = false;
@@ -251,9 +260,9 @@ namespace Apache.Qpid.AmqpTypes
                 this.replyToRoutingKey = other.replyToRoutingKey;
             }
 
-            if (other.routingKey != null)
+            if (other.subject != null)
             {
-                this.routingKey = other.routingKey;
+                this.subject = other.subject;
             }
 
             if (other.durable)

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs Sun Oct 24 23:29:37 2010
@@ -32,6 +32,7 @@ namespace Apache.Qpid.Channel
     using System.Text;
     using System.Threading;
     using System.Globalization;
+    using System.Web;
     using System.Xml;
 
     // the thin interop layer that provides access to the Qpid AMQP client libraries
@@ -52,11 +53,11 @@ namespace Apache.Qpid.Channel
         private bool shared;
         private int prefetchLimit;
         private string encoderContentType;
+        // AMQP subject/routing key
+        private string subject;
+        // Qpid addressing value for "qpid.subject" property
+        private string qpidSubject;
 
-        // input = 0-10 queue, output = 0-10 exchange
-        private string queueName;
-
-        private String routingKey;
         private BufferManager bufferManager;
         private AmqpProperties outputMessageProperties;
 
@@ -85,7 +86,7 @@ namespace Apache.Qpid.Channel
             this.remoteAddress = remoteAddress;
 
             // pull out host, port, queue, and connection arguments
-            this.ParseAmqpUri(remoteAddress.Uri);
+            string qpidAddress = this.UriToQpidAddress(remoteAddress.Uri, out subject);
 
             this.encoder = msgEncoder;
             string ct = String.Empty;
@@ -129,12 +130,14 @@ namespace Apache.Qpid.Channel
 
             if (this.isInputChannel)
             {
-                this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, this.queueName);
+                this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, qpidAddress);
                 this.inputLink.PrefetchLimit = this.prefetchLimit;
             }
             else
             {
-                this.outputLink = ConnectionManager.GetOutputLink(this.factoryChannelProperties, shared, false, this.queueName);
+                this.outputLink = ConnectionManager.GetOutputLink(this.factoryChannelProperties, shared, false, qpidAddress);
+                this.subject = this.outputLink.DefaultSubject;
+                this.qpidSubject = this.outputLink.QpidSubject;
             }
         }
 
@@ -423,9 +426,14 @@ namespace Apache.Qpid.Channel
                     outgoingProperties.MergeFrom(this.factoryChannelProperties.DefaultMessageProperties);
                 }
 
-                if (this.routingKey != null)
+                if (this.subject != null)
                 {
-                    outgoingProperties.RoutingKey = this.routingKey;
+                    outgoingProperties.RoutingKey = this.subject;
+                }
+
+                if (this.qpidSubject != null)
+                {
+                    outgoingProperties.PropertyMap["qpid.subject"] = new AmqpString(this.qpidSubject);
                 }
 
                 // Add the Properties set by the application on this particular message.
@@ -544,8 +552,7 @@ namespace Apache.Qpid.Channel
             this.bufferManager.Clear();
         }
 
-        // "amqp:queue1" | "amqp:stocks@broker1.com" | "amqp:queue3?routingkey=key"
-        private void ParseAmqpUri(Uri uri)
+        private string UriToQpidAddress(Uri uri, out string subject)
         {
             if (uri.Scheme != AmqpConstants.Scheme)
             {
@@ -553,43 +560,83 @@ namespace Apache.Qpid.Channel
                     "The scheme {0} specified in address is not supported.", uri.Scheme), "uri");
             }
 
-            this.queueName = uri.LocalPath;
+            subject = "";
+           string path = uri.LocalPath;
+            string query = uri.Query;
+
+            // legacy... convert old style myqueue?routingkey=key to myqueue/key
 
-            if ((this.queueName.IndexOf('@') != -1) && this.isInputChannel)
+            if (query.Length > 0)
             {
-                throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
-                    "Invalid input queue name: \"{0}\" specified.", this.queueName), "uri");
-            }
+                if (!query.StartsWith("?"))
+                {
+                    throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
+                        "Invalid query argument."), "uri");
+                }
 
-            // search out session parameters in the query portion of the URI
+                string routingParseKey = "routingkey=";
+                string subjectParseKey = "subject=";
+                char[] charSeparators = new char[] { '?', ';' };
+                string[] args = uri.Query.Split(charSeparators, StringSplitOptions.RemoveEmptyEntries);
+                foreach (string s in args)
+                {
+                    if (s.StartsWith(routingParseKey))
+                    {
+                        subject = s.Substring(routingParseKey.Length);
+                    }
+                    else if (s.StartsWith(subjectParseKey))
+                    {
+                        subject = s.Substring(subjectParseKey.Length);
+                    }
+                    else
+                    {
+                        if (s.Length > 0)
+                        {
+                            throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
+                            "Invalid query argument {0}.", s), "uri");
+                        }
+                    }
+                }
 
-            string routingParseKey = "routingkey=";
-            char[] charSeparators = new char[] { '?', ';' };
-            string[] args = uri.Query.Split(charSeparators, StringSplitOptions.RemoveEmptyEntries);
-            foreach (string s in args)
-            {
-                if (s.StartsWith(routingParseKey))
+                if (path.Contains("/"))
+                {
+                    throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
+                        "Invalid queue name {0}.", path), "uri");
+                }
+
+                if (path.Length == 0)
                 {
-                    this.routingKey = s.Substring(routingParseKey.Length);
+                    // special case, user wants default exchange
+                    return "//" + subject;
                 }
+
+                return path + "/" + subject;
             }
 
-            if (this.queueName == String.Empty)
+            // find subject in "myqueue/mysubject;{mode:browse}"
+            int pos = path.IndexOf('/');
+            if ((pos > -1) && (pos < path.Length + 1))
             {
-                if (this.isInputChannel)
+                subject = path.Substring(pos);
+                pos = subject.IndexOf(';');
+                if (pos == 0)
                 {
                     throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
-                        "Empty queue target specifier not allowed."), "uri");
+                        "Empty subject in address {0}.", path), "uri");
                 }
-                else
+
+                if (pos > 0)
                 {
-                    if (this.routingKey == null)
-                    {
-                        throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
-                        "No target queue or routing key specified."), "uri");
-                    }
+                    subject = subject.Substring(0, pos);
                 }
             }
+
+            if (subject.Length > 0)
+            {
+                subject = HttpUtility.UrlDecode(subject);
+            }
+
+            return HttpUtility.UrlDecode(path);
         }
     }
 }

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj Sun Oct 24 23:29:37 2010
@@ -97,6 +97,7 @@ under the License.
       <RequiredTargetFramework>3.0</RequiredTargetFramework>
     </Reference>
     <Reference Include="System.Transactions" />
+    <Reference Include="System.Web" />
     <Reference Include="System.XML" />
   </ItemGroup>
   <ItemGroup>

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp Sun Oct 24 23:29:37 2010
@@ -357,17 +357,20 @@ bool AmqpSession::MessageStop(std::strin
     return true;
 }
 
-void AmqpSession::AcceptAndComplete(SequenceSet& transfers)
+void AmqpSession::AcceptAndComplete(SequenceSet& transfers, bool browsing)
 {
     lock l(sessionLock);
 
-    // delimit with session dtx commands depending on the transaction context
-    UpdateTransactionState(%l);
+    if (!browsing) {
+	// delimit with session dtx commands depending on the transaction context
+	UpdateTransactionState(%l);
+    }
 
     CheckOpen();
 
     sessionp->markCompleted(transfers, false);
-    sessionp->messageAccept(transfers, false);
+    if (!browsing)
+	sessionp->messageAccept(transfers, false);
 }
 
 
@@ -609,4 +612,22 @@ void AmqpSession::ReleaseCompletion(IntP
     delete completion.ToPointer();
 }
 
+
+// Non-exclusive borrowing for a "brief" period.  I.e. several synced
+// commands (address resolution)
+
+IntPtr AmqpSession::BorrowNativeSession() {
+    lock l(sessionLock);
+    if (closing)
+	return IntPtr::Zero;
+
+    IncrementSyncs();
+    return (IntPtr) sessionp;
+}
+
+void AmqpSession::ReturnNativeSession() {
+    lock l(sessionLock);
+    DecrementSyncs();
+}
+
 }}} // namespace Apache::Qpid::Cli

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h Sun Oct 24 23:29:37 2010
@@ -78,7 +78,7 @@ public:  
     OutputLink^ CreateOutputLink(System::String^ targetQueue);
     InputLink^ CreateInputLink(System::String^ sourceQueue);
 
-    // 0-10 specific support
+    // 0-10 specific support; deprecated in favor of Qpid messaging addresses
     InputLink^ CreateInputLink(System::String^ sourceQueue, bool exclusive, bool temporary, System::String^ filterKey, System::String^ exchange);
     void Bind(System::String^ queue, System::String^ exchange, System::String^ filterKey);
 
@@ -90,7 +90,7 @@ internal:
     void internalWaitForCompletion(IntPtr future);
     void removeWaiter(CompletionWaiter^ waiter);
     bool MessageStop(std::string &name);
-    void AcceptAndComplete(SequenceSet& transfers);
+    void AcceptAndComplete(SequenceSet& transfers, bool browsing);
     IntPtr BeginPhase0Flush(XaTransaction^);
     void EndPhase0Flush(XaTransaction^, IntPtr);
     IntPtr DtxStart(IntPtr xidp, bool, bool);
@@ -98,6 +98,8 @@ internal:
     IntPtr DtxCommit(IntPtr xidp, bool onePhase);
     IntPtr DtxRollback(IntPtr xidp);
     void ReleaseCompletion(IntPtr completion);
+    IntPtr BorrowNativeSession();
+    void ReturnNativeSession();
 
     property AmqpConnection^ Connection {
 	AmqpConnection^ get () { return connection; }

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp Sun Oct 24 23:29:37 2010
@@ -88,9 +88,12 @@ InputLink::InputLink(AmqpSession^ sessio
     waiters = gcnew Collections::Generic::List<MessageWaiter^>();
     linkLock = waiters;  // private and available
     subscriptionLock = gcnew Object();
+    qpidAddress = QpidAddress::CreateAddress(sourceQueue, true);
+    qpidAddress->ResolveLink(session);
+    browsing = qpidAddress->Browsing;
 
     try {
-	std::string qname = QpidMarshal::ToNative(sourceQueue);
+	std::string qname = QpidMarshal::ToNative(qpidAddress->LinkName);
 
 	if (temporary) {
 	    qpidSessionp->queueDeclare(arg::queue=qname, arg::durable=false, arg::autoDelete=true, arg::exclusive=true);
@@ -104,6 +107,15 @@ InputLink::InputLink(AmqpSession^ sessio
 	settings.flowControl = FlowControl::messageCredit(0);
 	settings.completionMode = CompletionMode::MANUAL_COMPLETION;
 
+	if (browsing) {
+	    settings.acquireMode = AcquireMode::ACQUIRE_MODE_NOT_ACQUIRED;
+	    settings.acceptMode = AcceptMode::ACCEPT_MODE_NONE;
+	}
+	else {
+	    settings.acquireMode = AcquireMode::ACQUIRE_MODE_PRE_ACQUIRED;
+	    settings.acceptMode = AcceptMode::ACCEPT_MODE_EXPLICIT;
+	}
+
 	Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings);
 	subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup
 
@@ -186,8 +198,10 @@ void InputLink::Cleanup()
 	{
 	    ReleaseNative();
 	}
-
     }
+
+    // Now that subscription is torn down, we can execute pending delete on remote node
+    qpidAddress->CleanupLink(amqpSession);
     amqpSession->NotifyClosed();
 }
 
@@ -699,7 +713,7 @@ AmqpMessage^ InputLink::createAmqpMessag
 	
 	// subscriptionp->accept(frameSetID) is a slow sync operation in the native API
 	// so do it within the AsyncSession directly
-	amqpSession->AcceptAndComplete(frameSetID);
+	amqpSession->AcceptAndComplete(frameSetID, browsing);
 
 	workingCredit--;
 	// check if more messages need to be requested from broker

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h Sun Oct 24 23:29:37 2010
@@ -20,6 +20,7 @@
 #pragma once
 
 #include "MessageWaiter.h"
+#include "QpidAddress.h"
 
 namespace Apache {
 namespace Qpid {
@@ -58,6 +59,9 @@ private:
     // working credit low water mark
     int minWorkingCredit;
 
+    bool browsing;
+    QpidAddress^ qpidAddress;
+
     void Cleanup();
     void ReleaseNative();
     bool haveMessage();
@@ -97,6 +101,10 @@ public:  
 	void set (int value);
     }
 
+    property bool Browsing {
+	bool get () { return browsing; }
+    }
+
 };
 
 }}} // namespace Apache::Qpid::Interop

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj Sun Oct 24 23:29:37 2010
@@ -409,6 +409,10 @@
 				>
 			</File>
 			<File
+				RelativePath=".\QpidAddress.cpp"
+				>
+			</File>
+			<File
 				RelativePath=".\InputLink.cpp"
 				>
 			</File>
@@ -455,6 +459,10 @@
 				>
 			</File>
 			<File
+				RelativePath=".\QpidAddress.h"
+				>
+			</File>
+			<File
 				RelativePath=".\InputLink.h"
 				>
 			</File>

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp Sun Oct 24 23:29:37 2010
@@ -48,13 +48,14 @@ using namespace std;
 using namespace Apache::Qpid::AmqpTypes;
 
 
-OutputLink::OutputLink(AmqpSession^ session, String^ defaultQueue) :
+OutputLink::OutputLink(AmqpSession^ session, String^ address) :
     amqpSession(session),
-    queue(defaultQueue),
     disposed(false),
     maxFrameSize(session->Connection->MaxFrameSize),
     finalizing(false)
 {
+    qpidAddress = QpidAddress::CreateAddress(address, false);
+    qpidAddress->ResolveLink(session);
 }
 
 void OutputLink::Cleanup()
@@ -67,6 +68,8 @@ void OutputLink::Cleanup()
 	disposed = true;
     }
 
+    // process any pending queue delete
+    qpidAddress->CleanupLink(amqpSession);
     amqpSession->NotifyClosed();
 }
 
@@ -217,7 +220,8 @@ void OutputLink::Send(AmqpMessage^ amqpM
     ManagedToNative(amqpMessage);
 
     MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream;
-    CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, false, nullptr, nullptr);
+    CompletionWaiter^ waiter = amqpSession->SendMessage(qpidAddress->LinkName, messageBodyStream,
+							timeout, false, nullptr, nullptr);
 
     if (waiter != nullptr) {
 	waiter->WaitForCompletion();
@@ -234,7 +238,7 @@ IAsyncResult^ OutputLink::BeginSend(Amqp
     ManagedToNative(amqpMessage);
 
     MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream;
-    CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, true, callback, state);
+    CompletionWaiter^ waiter = amqpSession->SendMessage(qpidAddress->LinkName, messageBodyStream, timeout, true, callback, state);
     return waiter;
 }
 

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h Sun Oct 24 23:29:37 2010
@@ -19,6 +19,8 @@
 
 #pragma once
 
+#include "QpidAddress.h"
+
 namespace Apache {
 namespace Qpid {
 namespace Interop {
@@ -34,7 +36,7 @@ public ref class OutputLink
 {
 private:
     AmqpSession^ amqpSession;
-    String^ queue;
+    QpidAddress^ qpidAddress;
     bool disposed;
     bool finalizing;
     void Cleanup();
@@ -58,6 +60,15 @@ public:  
 	AmqpTypes::AmqpProperties^ get () { return defaultProperties; }
 	void set(AmqpTypes::AmqpProperties^ p) { defaultProperties = p; }
     }
+
+    property String^ DefaultSubject {
+	String^ get() { return (qpidAddress == nullptr) ? nullptr : qpidAddress->RoutingKey; }
+    }
+
+    property String^ QpidSubject {
+	String^ get() { return (qpidAddress == nullptr) ? nullptr : qpidAddress->Subject; }
+    }
+
 };
 
 

Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp?rev=1026915&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp Sun Oct 24 23:29:37 2010
@@ -0,0 +1,304 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+
+/*
+ * This program parses strings of the form "node/subject;{options}" as
+ * used in the Qpid messaging API.  It provides basic wiring
+ * capabilities to create/delete temporary queues (to topic
+ * subsciptions) and unbound "point and shoot" queues.
+ */
+
+
+#include <windows.h>
+#include <msclr\lock.h>
+#include <oletx2xa.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/client/Message.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/client/Future.h"
+
+#include "AmqpConnection.h"
+#include "AmqpSession.h"
+#include "AmqpMessage.h"
+#include "MessageBodyStream.h"
+#include "InputLink.h"
+#include "OutputLink.h"
+#include "QpidMarshal.h"
+#include "QpidException.h"
+#include "QpidAddress.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace std;
+
+QpidAddress::QpidAddress(String^ s, bool isInput) {
+    address = s;
+    nodeName = s;
+    isInputChannel = isInput;
+    isQueue = true;
+
+    if (address->StartsWith("//")) {
+	// special case old style address to default exchange, 
+	// no options, output only
+	if ((s->IndexOf(';') != -1) || isInputChannel)
+		throw gcnew ArgumentException("Invalid 0-10 address: " + address);
+	nodeName = nodeName->Substring(2);
+	return;
+    }
+
+    String^ options = nullptr;
+    int pos = s->IndexOf(';');
+    if (pos != -1) {
+	options = s->Substring(pos + 1);
+	nodeName = s->Substring(0, pos);
+
+	if (options->Length > 0) {
+	    if (!options->StartsWith("{") || !options->EndsWith("}"))
+		throw gcnew ArgumentException("Invalid address: " + address);
+	    options = options->Substring(1, options->Length - 2);
+	    array<String^>^ subOpts = options->Split(String(",: ").ToCharArray(), StringSplitOptions::RemoveEmptyEntries);
+
+	    if ((subOpts->Length % 2) != 0)
+		throw gcnew ArgumentException("Bad address (options): " + address);
+
+	    for (int i=0; i < subOpts->Length; i += 2) {
+		String^ opt = subOpts[i];
+		String^ optArg = subOpts[i+1];
+		if (opt->Equals("create")) {
+		    creating = PolicyApplies(optArg);
+		}
+		else if (opt->Equals("delete")) {
+		    deleting = PolicyApplies(optArg);
+		}
+		else if (opt->Equals("mode")) {
+		    if (optArg->Equals("browse")) {
+			browsing = isInputChannel;
+		    }
+		    else if (!optArg->Equals("consume")) {
+			throw gcnew ArgumentException("Invalid browsing option: " + optArg);
+		    }
+		}
+		else if (opt->Equals("assert") || opt->Equals("node")) {
+		    throw gcnew ArgumentException("Unsupported address option: " + opt);
+		}
+		else {
+		    throw gcnew ArgumentException("Bad address option: " + opt);
+		}
+	    }
+	}
+	else
+	    options = nullptr;
+    }
+
+    pos = nodeName->IndexOf('/');
+    if (pos != -1) {
+	subject = nodeName->Substring(pos + 1);
+	if (String::IsNullOrEmpty(subject))
+	    subject = nullptr;
+	nodeName = nodeName->Substring(0, pos);
+    }
+}
+
+
+QpidAddress^ QpidAddress::CreateAddress(String^ s, bool isInput) {
+    QpidAddress^ addr = gcnew QpidAddress(s, isInput);
+    return addr;
+}
+
+
+void QpidAddress::ResolveLink(AmqpSession^ amqpSession) {
+
+    AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer();
+    if (asyncSessionp == NULL)
+	throw gcnew ObjectDisposedException("session");
+
+    deleteName = nullptr;
+    isQueue = true;
+
+    try {
+	Session session = sync(*asyncSessionp);
+	std::string n_name = QpidMarshal::ToNative(nodeName);
+        ExchangeBoundResult result = session.exchangeBound(arg::exchange=n_name, arg::queue=n_name);
+
+	bool queueFound = !result.getQueueNotFound();
+	bool exchangeFound = !result.getExchangeNotFound();
+
+	if (isInputChannel) {
+
+	    if (queueFound) {
+		linkName = nodeName;
+		if (deleting)
+		    deleteName = nodeName;
+	    }
+	    else if (exchangeFound) {
+		isQueue = false;
+		String^ tmpkey = nullptr;
+		String^ tmpname = nodeName + "_" + Guid::NewGuid().ToString();
+		bool haveSubject = !String::IsNullOrEmpty(subject);
+		FieldTable bindArgs;
+
+		std::string exchangeType = session.exchangeQuery(n_name).getType();
+		if (exchangeType == "topic") {
+		    tmpkey = haveSubject ? subject : "#";
+		}
+		else if (exchangeType == "fanout") {
+		    tmpkey = tmpname;
+		}
+		else if (exchangeType == "headers") {
+		    tmpkey = haveSubject ? subject : "match-all";
+		    if (haveSubject)
+			bindArgs.setString("qpid.subject", QpidMarshal::ToNative(subject));
+		    bindArgs.setString("x-match", "all");
+		}
+		else if (exchangeType == "xml") {
+		    tmpkey = haveSubject ? subject : "";
+		    if (haveSubject) {
+			String^ v = "declare variable $qpid.subject external; $qpid.subject = '" +
+			    subject + "'";
+			bindArgs.setString("xquery", QpidMarshal::ToNative(v));
+		    }
+		    else
+			bindArgs.setString("xquery", "true()");
+		}
+		else {
+		    tmpkey = haveSubject ? subject : "";
+		}
+
+		std::string qn = QpidMarshal::ToNative(tmpname);
+		session.queueDeclare(arg::queue=qn, arg::autoDelete=true, arg::exclusive=true);
+		bool success = false;
+		try {
+		    session.exchangeBind(arg::exchange=n_name, arg::queue=qn, 
+					 arg::bindingKey=QpidMarshal::ToNative(tmpkey),
+					 arg::arguments=bindArgs);
+		    bindKey = tmpkey; // remember for later cleanup
+		    success = true;
+		}
+		finally {
+		    if (!success)
+			session.queueDelete(arg::queue=qn);
+		}
+		linkName = tmpname;
+		deleteName = tmpname;
+		deleting = true;
+	    }
+	    else if (creating) {
+		// only create "point and shoot" queues for now
+		session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName));
+		// leave unbound
+
+		linkName = nodeName;
+
+		if (deleting)
+		    deleteName = nodeName;
+	    }
+	    else {
+		throw gcnew ArgumentException("AMQP broker node not found: " + nodeName);
+	    }
+	}
+	else {
+	    // Output channel
+
+	    bool oldStyleUri = address->StartsWith("//");
+
+	    if (queueFound) {
+		linkName = "";	// default exchange for point and shoot
+		routingKey = nodeName;
+		if (deleting)
+		    deleteName = nodeName;
+	    }
+	    else if (exchangeFound && !oldStyleUri) {
+		isQueue = false;
+		linkName = nodeName;
+		routingKey = subject;
+	    }
+	    else if (creating) {
+		// only create "point and shoot" queues for now
+		session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName));
+		// leave unbound
+		linkName = "";
+		routingKey = nodeName;
+		if (deleting)
+		    deleteName = nodeName;
+	    }
+	    else {
+		throw gcnew ArgumentException("AMQP broker node not found: " + nodeName);
+	    }
+	}
+    }
+    finally {
+	amqpSession->ReturnNativeSession();
+    }
+}
+
+void QpidAddress::CleanupLink(AmqpSession^ amqpSession) {
+    if (deleteName == nullptr)
+	return;
+
+    AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer();
+    if (asyncSessionp == NULL) {
+	// TODO: log it: can't undo tear down actions
+	return;
+    }
+
+    try {
+	Session session = sync(*asyncSessionp);
+	std::string q = QpidMarshal::ToNative(deleteName);
+	if (isInputChannel && !isQueue) {
+	    // undo the temp wiring to the topic
+	    session.exchangeUnbind(arg::exchange=QpidMarshal::ToNative(nodeName), arg::queue=q,
+				   arg::bindingKey=QpidMarshal::ToNative(bindKey));
+	}
+	session.queueDelete(q);
+    }
+    catch (Exception^ e) {
+	// TODO: log it
+    }
+    finally {
+	amqpSession->ReturnNativeSession();
+    }
+}
+
+bool QpidAddress::PolicyApplies(String^ mode) {
+    if (mode->Equals("always"))
+	return true;
+    if (mode->Equals("sender"))
+	return !isInputChannel;
+    if (mode->Equals("receiver"))
+	return isInputChannel;
+    if (mode->Equals("never"))
+	return false;
+
+    throw gcnew ArgumentException(String::Format("Bad address option {0} for {1}", mode, address));
+}
+
+}}} // namespace Apache::Qpid::Interop

Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h?rev=1026915&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h Sun Oct 24 23:29:37 2010
@@ -0,0 +1,89 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+#include "MessageWaiter.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+using namespace System::Runtime::InteropServices;
+
+using namespace qpid::client;
+using namespace std;
+
+
+public ref class QpidAddress
+{
+private:
+    QpidAddress(String^ address, bool isInput);
+
+    // the original Qpid messaging address string, with WCF uri sematics removed, and URL decoded
+    String^ address;
+
+    String^ nodeName;
+    // "qpid.subject"
+    String^ subject;
+    // 0-10 routing key (Output channels only)
+    String^ routingKey;
+
+    String^ linkName;
+    String^ deleteName;
+    String^ bindKey;
+
+    // node type: queue/topic
+    bool isQueue;
+
+    // direction
+    bool isInputChannel;
+
+    bool creating;
+    bool deleting;
+    bool browsing;
+
+    bool PolicyApplies(String^ mode);
+
+internal:
+    static QpidAddress^ CreateAddress(String ^s, bool isInput);
+    void ResolveLink(AmqpSession^ amqpSession);
+    void CleanupLink(AmqpSession^ amqpSession);
+
+    property String^ LinkName {
+	String^ get () { return linkName; }
+    }
+
+    property String^ Subject {
+	String^ get () { return subject; }
+    }
+
+    property String^ RoutingKey {
+	String^ get () { return routingKey; }
+    }
+
+    property bool Browsing {
+	bool get () { return browsing; }
+    }
+
+};
+
+}}} // namespace Apache::Qpid::Interop



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org