You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2010/10/25 01:29:38 UTC
svn commit: r1026915 - in /qpid/trunk/qpid/wcf: samples/Integration/
samples/Integration/Drain/ samples/Integration/Spout/
samples/Integration/Util/ src/Apache/Qpid/AmqpTypes/
src/Apache/Qpid/Channel/ src/Apache/Qpid/Interop/
Author: cliffjansen
Date: Sun Oct 24 23:29:37 2010
New Revision: 1026915
URL: http://svn.apache.org/viewvc?rev=1026915&view=rev
Log:
QPID-2646 patches
Added:
qpid/trunk/qpid/wcf/samples/Integration/
qpid/trunk/qpid/wcf/samples/Integration/Drain/
qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.cs
qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.csproj
qpid/trunk/qpid/wcf/samples/Integration/Integration.sln
qpid/trunk/qpid/wcf/samples/Integration/Spout/
qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.cs
qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.csproj
qpid/trunk/qpid/wcf/samples/Integration/Util/
qpid/trunk/qpid/wcf/samples/Integration/Util/Options.cs
qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp
qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h
Modified:
qpid/trunk/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs
qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp
qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h
Added: qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.cs?rev=1026915&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.cs (added)
+++ qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.cs Sun Oct 24 23:29:37 2010
@@ -0,0 +1,146 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Samples.Integration
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.IO;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.ServiceModel.Description;
+ using System.Text;
+ using System.Xml;
+ using Apache.Qpid.Channel;
+ using Apache.Qpid.AmqpTypes;
+
+ class Drain
+ {
+ // delimit multiple values
+ private static void Append(StringBuilder sb, string s)
+ {
+ if (sb.Length > 0)
+ {
+ sb.Append(", ");
+ }
+
+ sb.Append(s);
+ }
+
+ private static string MessagePropertiesAsString(AmqpProperties props)
+ {
+ StringBuilder sb = new StringBuilder();
+
+ if (props.PropertyMap != null)
+ {
+ foreach (KeyValuePair<string, AmqpType> kvp in props.PropertyMap)
+ {
+ string propval;
+ if (kvp.Value is AmqpString)
+ {
+ AmqpString amqps = (AmqpString)kvp.Value;
+ propval = amqps.Value;
+ }
+ else
+ {
+ propval = kvp.Value.ToString();
+ }
+
+ Append(sb, kvp.Key + ":" + propval);
+ }
+ }
+
+ return sb.ToString();
+ }
+
+ private static string MessageContentAsString(Message msg, AmqpProperties props)
+ {
+ // AmqpBinaryBinding provides message content as a single XML "Binary" element
+ XmlDictionaryReader reader = msg.GetReaderAtBodyContents();
+ while (!reader.HasValue)
+ {
+ reader.Read();
+ if (reader.EOF)
+ {
+ throw new InvalidDataException("empty reader for message");
+ }
+ }
+
+ byte[] rawdata = reader.ReadContentAsBase64();
+
+ string ct = props.ContentType;
+ if (ct != null)
+ {
+ if (ct.Equals("amqp/map"))
+ {
+ return "mapdata (coming soon)";
+ }
+ }
+
+ return Encoding.UTF8.GetString(rawdata);
+ }
+
+ static void Main(string[] args)
+ {
+ try
+ {
+ Options options = new Options(args);
+
+ AmqpBinaryBinding binding = new AmqpBinaryBinding();
+ binding.BrokerHost = options.Broker;
+ binding.BrokerPort = options.Port;
+ binding.TransferMode = TransferMode.Streamed;
+
+ IChannelFactory<IInputChannel> factory = binding.BuildChannelFactory<IInputChannel>();
+
+ factory.Open();
+ try
+ {
+ System.ServiceModel.EndpointAddress addr = options.Address;
+ IInputChannel receiver = factory.CreateChannel(addr);
+ receiver.Open();
+
+ TimeSpan timeout = options.Timeout;
+ System.ServiceModel.Channels.Message message;
+
+ while (receiver.TryReceive(timeout, out message))
+ {
+ AmqpProperties props = (AmqpProperties)message.Properties["AmqpProperties"];
+
+ Console.WriteLine("Message(properties=" +
+ MessagePropertiesAsString(props) +
+ ", content='" +
+ MessageContentAsString(message, props) +
+ "')");
+ }
+ }
+ finally
+ {
+ factory.Close();
+ }
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Drain: " + e);
+ }
+ }
+ }
+}
Added: qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.csproj
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.csproj?rev=1026915&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.csproj (added)
+++ qpid/trunk/qpid/wcf/samples/Integration/Drain/Drain.csproj Sun Oct 24 23:29:37 2010
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.21022</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{A67D9B60-34A5-462F-84A2-72C22F623749}</ProjectGuid>
+ <OutputType>Exe</OutputType>
+ <RootNamespace>Drain</RootNamespace>
+ <AssemblyName>Drain</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Apache.Qpid.Channel, Version=1.0.0.0, Culture=neutral, PublicKeyToken=679e1f50b62dbace, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\src\Apache\Qpid\Channel\bin\Release\Apache.Qpid.Channel.dll</HintPath>
+ </Reference>
+ <Reference Include="Apache.Qpid.Interop, Version=1.0.3796.12140, Culture=neutral, PublicKeyToken=679e1f50b62dbace, processorArchitecture=AMD64">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\src\Apache\Qpid\Channel\bin\Release\Apache.Qpid.Interop.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Runtime.Serialization">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.ServiceModel">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="Drain.cs" />
+ <Compile Include="..\Util\Options.cs" />
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
Added: qpid/trunk/qpid/wcf/samples/Integration/Integration.sln
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/samples/Integration/Integration.sln?rev=1026915&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/samples/Integration/Integration.sln (added)
+++ qpid/trunk/qpid/wcf/samples/Integration/Integration.sln Sun Oct 24 23:29:37 2010
@@ -0,0 +1,46 @@
+
+Microsoft Visual Studio Solution File, Format Version 10.00
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+# Visual Studio 2008
+
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Drain", "Drain\Drain.csproj", "{A67D9B60-34A5-462F-84A2-72C22F623749}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Spout", "Spout\Spout.csproj", "{347A531B-38DB-4848-9E4D-4E5E7F9C97E7}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {A67D9B60-34A5-462F-84A2-72C22F623749}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {A67D9B60-34A5-462F-84A2-72C22F623749}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {A67D9B60-34A5-462F-84A2-72C22F623749}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {A67D9B60-34A5-462F-84A2-72C22F623749}.Release|Any CPU.Build.0 = Release|Any CPU
+ {347A531B-38DB-4848-9E4D-4E5E7F9C97E7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {347A531B-38DB-4848-9E4D-4E5E7F9C97E7}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {347A531B-38DB-4848-9E4D-4E5E7F9C97E7}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {347A531B-38DB-4848-9E4D-4E5E7F9C97E7}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+EndGlobal
Added: qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.cs?rev=1026915&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.cs (added)
+++ qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.cs Sun Oct 24 23:29:37 2010
@@ -0,0 +1,109 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Samples.Integration
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.IO;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.ServiceModel.Description;
+ using System.Text;
+ using System.Xml;
+ using Apache.Qpid.Channel;
+ using Apache.Qpid.AmqpTypes;
+
+ class Spout
+ {
+ static void Main(string[] args)
+ {
+ try
+ {
+ Options options = new Options(args);
+
+ AmqpBinaryBinding binding = new AmqpBinaryBinding();
+ binding.BrokerHost = options.Broker;
+ binding.BrokerPort = options.Port;
+ binding.TransferMode = TransferMode.Streamed;
+
+ IChannelFactory<IOutputChannel> factory = binding.BuildChannelFactory<IOutputChannel>();
+
+ factory.Open();
+ try
+ {
+ System.ServiceModel.EndpointAddress addr = options.Address;
+ IOutputChannel sender = factory.CreateChannel(addr);
+ sender.Open();
+
+ MyRawBodyWriter.Initialize(options.Content);
+ DateTime end = DateTime.Now.Add(options.Timeout);
+ System.ServiceModel.Channels.Message message;
+
+ for (int count = 0; ((count < options.Count) || (options.Count == 0)) &&
+ ((options.Timeout == TimeSpan.Zero) || (end.CompareTo(DateTime.Now) > 0)); count++)
+ {
+ message = Message.CreateMessage(MessageVersion.None, "", new MyRawBodyWriter());
+ AmqpProperties props = new AmqpProperties();
+ props.ContentType = "text/plain";
+
+ string id = Guid.NewGuid().ToString() + ":" + count;
+ props.PropertyMap.Add("spout-id", new AmqpString(id));
+
+ message.Properties["AmqpProperties"] = props;
+ sender.Send(message);
+ }
+ }
+ finally
+ {
+ factory.Close();
+ }
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Spout: " + e);
+ }
+ }
+
+
+ public class MyRawBodyWriter : BodyWriter
+ {
+ static byte[] body;
+
+ public MyRawBodyWriter()
+ : base(false)
+ {
+ }
+
+ public static void Initialize(String content)
+ {
+ body = Encoding.UTF8.GetBytes(content);
+ }
+
+ // invoked by the binary encoder when the message is written
+ protected override void OnWriteBodyContents(XmlDictionaryWriter writer)
+ {
+ writer.WriteStartElement("Binary");
+ writer.WriteBase64(body, 0, body.Length);
+ }
+ }
+ }
+}
Added: qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.csproj
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.csproj?rev=1026915&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.csproj (added)
+++ qpid/trunk/qpid/wcf/samples/Integration/Spout/Spout.csproj Sun Oct 24 23:29:37 2010
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.21022</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{347A531B-38DB-4848-9E4D-4E5E7F9C97E7}</ProjectGuid>
+ <OutputType>Exe</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Spout</RootNamespace>
+ <AssemblyName>Spout</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Apache.Qpid.Channel, Version=1.0.0.0, Culture=neutral, PublicKeyToken=679e1f50b62dbace, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\src\Apache\Qpid\Channel\bin\Release\Apache.Qpid.Channel.dll</HintPath>
+ </Reference>
+ <Reference Include="Apache.Qpid.Interop, Version=1.0.3796.12140, Culture=neutral, PublicKeyToken=679e1f50b62dbace, processorArchitecture=AMD64">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\src\Apache\Qpid\Channel\bin\Release\Apache.Qpid.Interop.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Runtime.Serialization">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.ServiceModel">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="..\Util\Options.cs" />
+ <Compile Include="Spout.cs" />
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
Added: qpid/trunk/qpid/wcf/samples/Integration/Util/Options.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/samples/Integration/Util/Options.cs?rev=1026915&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/samples/Integration/Util/Options.cs (added)
+++ qpid/trunk/qpid/wcf/samples/Integration/Util/Options.cs Sun Oct 24 23:29:37 2010
@@ -0,0 +1,157 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Samples.Integration
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.IO;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.ServiceModel.Description;
+ using System.Text;
+ using System.Xml;
+
+ public class Options
+ {
+ private string broker;
+ private int port;
+ private int messageCount;
+ private EndpointAddress address;
+ private TimeSpan timeout;
+ private string content;
+
+ public Options(string[] args)
+ {
+ this.broker = "127.0.0.1";
+ this.port = 5672;
+ this.messageCount = 1;
+ this.timeout = TimeSpan.FromSeconds(0);
+ Parse(args);
+ }
+
+ private void Parse(string[] args)
+ {
+ int argCount = args.Length;
+ int current = 0;
+ bool typeSelected = false;
+
+ while ((current + 1) < argCount)
+ {
+ string arg = args[current];
+ if (arg == "--count")
+ {
+ arg = args[++current];
+ int i = Int32.Parse(arg);
+ if (i >= 0)
+ {
+ this.messageCount = i;
+ }
+ }
+ else if (arg == "--broker")
+ {
+ this.broker = args[++current];
+ }
+ else if (arg == "--port")
+ {
+ arg = args[++current];
+ int i = int.Parse(arg);
+ if (i > 0)
+ {
+ this.port = i;
+ }
+ }
+ else if (arg == "--timeout")
+ {
+ arg = args[++current];
+ int i = int.Parse(arg);
+ if (i > 0)
+ {
+ this.timeout = TimeSpan.FromSeconds(i);
+ }
+ }
+
+ else if (arg == "--content")
+ {
+ this.content = args[++current];
+ }
+
+ else
+ {
+ throw new ArgumentException(String.Format("unknown argument \"{0}\"", arg));
+ }
+
+ current++;
+ }
+
+ if (current == argCount)
+ {
+ throw new ArgumentException("missing argument: address");
+ }
+
+ address = new EndpointAddress("amqp:" + args[current]);
+
+ if (timeout < TimeSpan.FromMilliseconds(100))
+ {
+ // WCF timeout of 0 really means no time for even a single message transfer
+ timeout = TimeSpan.FromMilliseconds(100);
+ }
+ }
+
+ public EndpointAddress Address
+ {
+ get { return this.address; }
+ }
+
+ public string Broker
+ {
+ get { return this.broker; }
+ }
+
+ public string Content
+ {
+ get
+ {
+ if (content == null)
+ {
+ return String.Empty;
+ }
+ return content;
+ }
+ }
+
+
+ public int Count
+ {
+ get { return this.messageCount; }
+ }
+
+ public int Port
+ {
+ get { return this.port; }
+ }
+
+ public TimeSpan Timeout
+ {
+ get { return this.timeout; }
+ }
+ }
+}
Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs Sun Oct 24 23:29:37 2010
@@ -29,7 +29,7 @@ namespace Apache.Qpid.AmqpTypes
// AMQP 0-10 delivery properties
private bool durable;
private Nullable<TimeSpan> timeToLive;
- private string routingKey;
+ private string subject;
// AMQP 0-10 message properties
private string replyToExchange;
@@ -50,7 +50,7 @@ namespace Apache.Qpid.AmqpTypes
{
get
{
- return ((this.routingKey != null) || this.durable || this.timeToLive.HasValue);
+ return ((this.subject != null) || this.durable || this.timeToLive.HasValue);
}
}
@@ -163,10 +163,19 @@ namespace Apache.Qpid.AmqpTypes
set { this.timeToLive = value; }
}
+ /// <summary>
+ /// Obsolete: switch to AMQP 1.0 "Subject" naming
+ /// </summary>
public string RoutingKey
{
- get { return this.routingKey; }
- set { this.routingKey = value; }
+ get { return this.subject; }
+ set { this.subject = value; }
+ }
+
+ public string Subject
+ {
+ get { return this.subject; }
+ set { this.subject = value; }
}
public string ReplyToExchange
@@ -200,7 +209,7 @@ namespace Apache.Qpid.AmqpTypes
public void Clear()
{
this.timeToLive = null;
- this.routingKey = null;
+ this.subject = null;
this.replyToRoutingKey = null;
this.replyToExchange = null;
this.durable = false;
@@ -251,9 +260,9 @@ namespace Apache.Qpid.AmqpTypes
this.replyToRoutingKey = other.replyToRoutingKey;
}
- if (other.routingKey != null)
+ if (other.subject != null)
{
- this.routingKey = other.routingKey;
+ this.subject = other.subject;
}
if (other.durable)
Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs Sun Oct 24 23:29:37 2010
@@ -32,6 +32,7 @@ namespace Apache.Qpid.Channel
using System.Text;
using System.Threading;
using System.Globalization;
+ using System.Web;
using System.Xml;
// the thin interop layer that provides access to the Qpid AMQP client libraries
@@ -52,11 +53,11 @@ namespace Apache.Qpid.Channel
private bool shared;
private int prefetchLimit;
private string encoderContentType;
+ // AMQP subject/routing key
+ private string subject;
+ // Qpid addressing value for "qpid.subject" property
+ private string qpidSubject;
- // input = 0-10 queue, output = 0-10 exchange
- private string queueName;
-
- private String routingKey;
private BufferManager bufferManager;
private AmqpProperties outputMessageProperties;
@@ -85,7 +86,7 @@ namespace Apache.Qpid.Channel
this.remoteAddress = remoteAddress;
// pull out host, port, queue, and connection arguments
- this.ParseAmqpUri(remoteAddress.Uri);
+ string qpidAddress = this.UriToQpidAddress(remoteAddress.Uri, out subject);
this.encoder = msgEncoder;
string ct = String.Empty;
@@ -129,12 +130,14 @@ namespace Apache.Qpid.Channel
if (this.isInputChannel)
{
- this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, this.queueName);
+ this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, qpidAddress);
this.inputLink.PrefetchLimit = this.prefetchLimit;
}
else
{
- this.outputLink = ConnectionManager.GetOutputLink(this.factoryChannelProperties, shared, false, this.queueName);
+ this.outputLink = ConnectionManager.GetOutputLink(this.factoryChannelProperties, shared, false, qpidAddress);
+ this.subject = this.outputLink.DefaultSubject;
+ this.qpidSubject = this.outputLink.QpidSubject;
}
}
@@ -423,9 +426,14 @@ namespace Apache.Qpid.Channel
outgoingProperties.MergeFrom(this.factoryChannelProperties.DefaultMessageProperties);
}
- if (this.routingKey != null)
+ if (this.subject != null)
{
- outgoingProperties.RoutingKey = this.routingKey;
+ outgoingProperties.RoutingKey = this.subject;
+ }
+
+ if (this.qpidSubject != null)
+ {
+ outgoingProperties.PropertyMap["qpid.subject"] = new AmqpString(this.qpidSubject);
}
// Add the Properties set by the application on this particular message.
@@ -544,8 +552,7 @@ namespace Apache.Qpid.Channel
this.bufferManager.Clear();
}
- // "amqp:queue1" | "amqp:stocks@broker1.com" | "amqp:queue3?routingkey=key"
- private void ParseAmqpUri(Uri uri)
+ private string UriToQpidAddress(Uri uri, out string subject)
{
if (uri.Scheme != AmqpConstants.Scheme)
{
@@ -553,43 +560,83 @@ namespace Apache.Qpid.Channel
"The scheme {0} specified in address is not supported.", uri.Scheme), "uri");
}
- this.queueName = uri.LocalPath;
+ subject = "";
+ string path = uri.LocalPath;
+ string query = uri.Query;
+
+ // legacy... convert old style myqueue?routingkey=key to myqueue/key
- if ((this.queueName.IndexOf('@') != -1) && this.isInputChannel)
+ if (query.Length > 0)
{
- throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
- "Invalid input queue name: \"{0}\" specified.", this.queueName), "uri");
- }
+ if (!query.StartsWith("?"))
+ {
+ throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
+ "Invalid query argument."), "uri");
+ }
- // search out session parameters in the query portion of the URI
+ string routingParseKey = "routingkey=";
+ string subjectParseKey = "subject=";
+ char[] charSeparators = new char[] { '?', ';' };
+ string[] args = uri.Query.Split(charSeparators, StringSplitOptions.RemoveEmptyEntries);
+ foreach (string s in args)
+ {
+ if (s.StartsWith(routingParseKey))
+ {
+ subject = s.Substring(routingParseKey.Length);
+ }
+ else if (s.StartsWith(subjectParseKey))
+ {
+ subject = s.Substring(subjectParseKey.Length);
+ }
+ else
+ {
+ if (s.Length > 0)
+ {
+ throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
+ "Invalid query argument {0}.", s), "uri");
+ }
+ }
+ }
- string routingParseKey = "routingkey=";
- char[] charSeparators = new char[] { '?', ';' };
- string[] args = uri.Query.Split(charSeparators, StringSplitOptions.RemoveEmptyEntries);
- foreach (string s in args)
- {
- if (s.StartsWith(routingParseKey))
+ if (path.Contains("/"))
+ {
+ throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
+ "Invalid queue name {0}.", path), "uri");
+ }
+
+ if (path.Length == 0)
{
- this.routingKey = s.Substring(routingParseKey.Length);
+ // special case, user wants default exchange
+ return "//" + subject;
}
+
+ return path + "/" + subject;
}
- if (this.queueName == String.Empty)
+ // find subject in "myqueue/mysubject;{mode:browse}"
+ int pos = path.IndexOf('/');
+ if ((pos > -1) && (pos < path.Length + 1))
{
- if (this.isInputChannel)
+ subject = path.Substring(pos);
+ pos = subject.IndexOf(';');
+ if (pos == 0)
{
throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
- "Empty queue target specifier not allowed."), "uri");
+ "Empty subject in address {0}.", path), "uri");
}
- else
+
+ if (pos > 0)
{
- if (this.routingKey == null)
- {
- throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
- "No target queue or routing key specified."), "uri");
- }
+ subject = subject.Substring(0, pos);
}
}
+
+ if (subject.Length > 0)
+ {
+ subject = HttpUtility.UrlDecode(subject);
+ }
+
+ return HttpUtility.UrlDecode(path);
}
}
}
Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj Sun Oct 24 23:29:37 2010
@@ -97,6 +97,7 @@ under the License.
<RequiredTargetFramework>3.0</RequiredTargetFramework>
</Reference>
<Reference Include="System.Transactions" />
+ <Reference Include="System.Web" />
<Reference Include="System.XML" />
</ItemGroup>
<ItemGroup>
Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp Sun Oct 24 23:29:37 2010
@@ -357,17 +357,20 @@ bool AmqpSession::MessageStop(std::strin
return true;
}
-void AmqpSession::AcceptAndComplete(SequenceSet& transfers)
+void AmqpSession::AcceptAndComplete(SequenceSet& transfers, bool browsing)
{
lock l(sessionLock);
- // delimit with session dtx commands depending on the transaction context
- UpdateTransactionState(%l);
+ if (!browsing) {
+ // delimit with session dtx commands depending on the transaction context
+ UpdateTransactionState(%l);
+ }
CheckOpen();
sessionp->markCompleted(transfers, false);
- sessionp->messageAccept(transfers, false);
+ if (!browsing)
+ sessionp->messageAccept(transfers, false);
}
@@ -609,4 +612,22 @@ void AmqpSession::ReleaseCompletion(IntP
delete completion.ToPointer();
}
+
+// Non-exclusive borrowing for a "brief" period. I.e. several synced
+// commands (address resolution)
+
+IntPtr AmqpSession::BorrowNativeSession() {
+ lock l(sessionLock);
+ if (closing)
+ return IntPtr::Zero;
+
+ IncrementSyncs();
+ return (IntPtr) sessionp;
+}
+
+void AmqpSession::ReturnNativeSession() {
+ lock l(sessionLock);
+ DecrementSyncs();
+}
+
}}} // namespace Apache::Qpid::Cli
Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h Sun Oct 24 23:29:37 2010
@@ -78,7 +78,7 @@ public:
OutputLink^ CreateOutputLink(System::String^ targetQueue);
InputLink^ CreateInputLink(System::String^ sourceQueue);
- // 0-10 specific support
+ // 0-10 specific support; deprecated in favor of Qpid messaging addresses
InputLink^ CreateInputLink(System::String^ sourceQueue, bool exclusive, bool temporary, System::String^ filterKey, System::String^ exchange);
void Bind(System::String^ queue, System::String^ exchange, System::String^ filterKey);
@@ -90,7 +90,7 @@ internal:
void internalWaitForCompletion(IntPtr future);
void removeWaiter(CompletionWaiter^ waiter);
bool MessageStop(std::string &name);
- void AcceptAndComplete(SequenceSet& transfers);
+ void AcceptAndComplete(SequenceSet& transfers, bool browsing);
IntPtr BeginPhase0Flush(XaTransaction^);
void EndPhase0Flush(XaTransaction^, IntPtr);
IntPtr DtxStart(IntPtr xidp, bool, bool);
@@ -98,6 +98,8 @@ internal:
IntPtr DtxCommit(IntPtr xidp, bool onePhase);
IntPtr DtxRollback(IntPtr xidp);
void ReleaseCompletion(IntPtr completion);
+ IntPtr BorrowNativeSession();
+ void ReturnNativeSession();
property AmqpConnection^ Connection {
AmqpConnection^ get () { return connection; }
Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp Sun Oct 24 23:29:37 2010
@@ -88,9 +88,12 @@ InputLink::InputLink(AmqpSession^ sessio
waiters = gcnew Collections::Generic::List<MessageWaiter^>();
linkLock = waiters; // private and available
subscriptionLock = gcnew Object();
+ qpidAddress = QpidAddress::CreateAddress(sourceQueue, true);
+ qpidAddress->ResolveLink(session);
+ browsing = qpidAddress->Browsing;
try {
- std::string qname = QpidMarshal::ToNative(sourceQueue);
+ std::string qname = QpidMarshal::ToNative(qpidAddress->LinkName);
if (temporary) {
qpidSessionp->queueDeclare(arg::queue=qname, arg::durable=false, arg::autoDelete=true, arg::exclusive=true);
@@ -104,6 +107,15 @@ InputLink::InputLink(AmqpSession^ sessio
settings.flowControl = FlowControl::messageCredit(0);
settings.completionMode = CompletionMode::MANUAL_COMPLETION;
+ if (browsing) {
+ settings.acquireMode = AcquireMode::ACQUIRE_MODE_NOT_ACQUIRED;
+ settings.acceptMode = AcceptMode::ACCEPT_MODE_NONE;
+ }
+ else {
+ settings.acquireMode = AcquireMode::ACQUIRE_MODE_PRE_ACQUIRED;
+ settings.acceptMode = AcceptMode::ACCEPT_MODE_EXPLICIT;
+ }
+
Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings);
subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup
@@ -186,8 +198,10 @@ void InputLink::Cleanup()
{
ReleaseNative();
}
-
}
+
+ // Now that subscription is torn down, we can execute pending delete on remote node
+ qpidAddress->CleanupLink(amqpSession);
amqpSession->NotifyClosed();
}
@@ -699,7 +713,7 @@ AmqpMessage^ InputLink::createAmqpMessag
// subscriptionp->accept(frameSetID) is a slow sync operation in the native API
// so do it within the AsyncSession directly
- amqpSession->AcceptAndComplete(frameSetID);
+ amqpSession->AcceptAndComplete(frameSetID, browsing);
workingCredit--;
// check if more messages need to be requested from broker
Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h Sun Oct 24 23:29:37 2010
@@ -20,6 +20,7 @@
#pragma once
#include "MessageWaiter.h"
+#include "QpidAddress.h"
namespace Apache {
namespace Qpid {
@@ -58,6 +59,9 @@ private:
// working credit low water mark
int minWorkingCredit;
+ bool browsing;
+ QpidAddress^ qpidAddress;
+
void Cleanup();
void ReleaseNative();
bool haveMessage();
@@ -97,6 +101,10 @@ public:
void set (int value);
}
+ property bool Browsing {
+ bool get () { return browsing; }
+ }
+
};
}}} // namespace Apache::Qpid::Interop
Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj Sun Oct 24 23:29:37 2010
@@ -409,6 +409,10 @@
>
</File>
<File
+ RelativePath=".\QpidAddress.cpp"
+ >
+ </File>
+ <File
RelativePath=".\InputLink.cpp"
>
</File>
@@ -455,6 +459,10 @@
>
</File>
<File
+ RelativePath=".\QpidAddress.h"
+ >
+ </File>
+ <File
RelativePath=".\InputLink.h"
>
</File>
Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp Sun Oct 24 23:29:37 2010
@@ -48,13 +48,14 @@ using namespace std;
using namespace Apache::Qpid::AmqpTypes;
-OutputLink::OutputLink(AmqpSession^ session, String^ defaultQueue) :
+OutputLink::OutputLink(AmqpSession^ session, String^ address) :
amqpSession(session),
- queue(defaultQueue),
disposed(false),
maxFrameSize(session->Connection->MaxFrameSize),
finalizing(false)
{
+ qpidAddress = QpidAddress::CreateAddress(address, false);
+ qpidAddress->ResolveLink(session);
}
void OutputLink::Cleanup()
@@ -67,6 +68,8 @@ void OutputLink::Cleanup()
disposed = true;
}
+ // process any pending queue delete
+ qpidAddress->CleanupLink(amqpSession);
amqpSession->NotifyClosed();
}
@@ -217,7 +220,8 @@ void OutputLink::Send(AmqpMessage^ amqpM
ManagedToNative(amqpMessage);
MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream;
- CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, false, nullptr, nullptr);
+ CompletionWaiter^ waiter = amqpSession->SendMessage(qpidAddress->LinkName, messageBodyStream,
+ timeout, false, nullptr, nullptr);
if (waiter != nullptr) {
waiter->WaitForCompletion();
@@ -234,7 +238,7 @@ IAsyncResult^ OutputLink::BeginSend(Amqp
ManagedToNative(amqpMessage);
MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream;
- CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, true, callback, state);
+ CompletionWaiter^ waiter = amqpSession->SendMessage(qpidAddress->LinkName, messageBodyStream, timeout, true, callback, state);
return waiter;
}
Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h?rev=1026915&r1=1026914&r2=1026915&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h Sun Oct 24 23:29:37 2010
@@ -19,6 +19,8 @@
#pragma once
+#include "QpidAddress.h"
+
namespace Apache {
namespace Qpid {
namespace Interop {
@@ -34,7 +36,7 @@ public ref class OutputLink
{
private:
AmqpSession^ amqpSession;
- String^ queue;
+ QpidAddress^ qpidAddress;
bool disposed;
bool finalizing;
void Cleanup();
@@ -58,6 +60,15 @@ public:
AmqpTypes::AmqpProperties^ get () { return defaultProperties; }
void set(AmqpTypes::AmqpProperties^ p) { defaultProperties = p; }
}
+
+ property String^ DefaultSubject {
+ String^ get() { return (qpidAddress == nullptr) ? nullptr : qpidAddress->RoutingKey; }
+ }
+
+ property String^ QpidSubject {
+ String^ get() { return (qpidAddress == nullptr) ? nullptr : qpidAddress->Subject; }
+ }
+
};
Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp?rev=1026915&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp Sun Oct 24 23:29:37 2010
@@ -0,0 +1,304 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+
+/*
+ * This program parses strings of the form "node/subject;{options}" as
+ * used in the Qpid messaging API. It provides basic wiring
+ * capabilities to create/delete temporary queues (to topic
+ * subsciptions) and unbound "point and shoot" queues.
+ */
+
+
+#include <windows.h>
+#include <msclr\lock.h>
+#include <oletx2xa.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/client/Message.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/client/Future.h"
+
+#include "AmqpConnection.h"
+#include "AmqpSession.h"
+#include "AmqpMessage.h"
+#include "MessageBodyStream.h"
+#include "InputLink.h"
+#include "OutputLink.h"
+#include "QpidMarshal.h"
+#include "QpidException.h"
+#include "QpidAddress.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace std;
+
+QpidAddress::QpidAddress(String^ s, bool isInput) {
+ address = s;
+ nodeName = s;
+ isInputChannel = isInput;
+ isQueue = true;
+
+ if (address->StartsWith("//")) {
+ // special case old style address to default exchange,
+ // no options, output only
+ if ((s->IndexOf(';') != -1) || isInputChannel)
+ throw gcnew ArgumentException("Invalid 0-10 address: " + address);
+ nodeName = nodeName->Substring(2);
+ return;
+ }
+
+ String^ options = nullptr;
+ int pos = s->IndexOf(';');
+ if (pos != -1) {
+ options = s->Substring(pos + 1);
+ nodeName = s->Substring(0, pos);
+
+ if (options->Length > 0) {
+ if (!options->StartsWith("{") || !options->EndsWith("}"))
+ throw gcnew ArgumentException("Invalid address: " + address);
+ options = options->Substring(1, options->Length - 2);
+ array<String^>^ subOpts = options->Split(String(",: ").ToCharArray(), StringSplitOptions::RemoveEmptyEntries);
+
+ if ((subOpts->Length % 2) != 0)
+ throw gcnew ArgumentException("Bad address (options): " + address);
+
+ for (int i=0; i < subOpts->Length; i += 2) {
+ String^ opt = subOpts[i];
+ String^ optArg = subOpts[i+1];
+ if (opt->Equals("create")) {
+ creating = PolicyApplies(optArg);
+ }
+ else if (opt->Equals("delete")) {
+ deleting = PolicyApplies(optArg);
+ }
+ else if (opt->Equals("mode")) {
+ if (optArg->Equals("browse")) {
+ browsing = isInputChannel;
+ }
+ else if (!optArg->Equals("consume")) {
+ throw gcnew ArgumentException("Invalid browsing option: " + optArg);
+ }
+ }
+ else if (opt->Equals("assert") || opt->Equals("node")) {
+ throw gcnew ArgumentException("Unsupported address option: " + opt);
+ }
+ else {
+ throw gcnew ArgumentException("Bad address option: " + opt);
+ }
+ }
+ }
+ else
+ options = nullptr;
+ }
+
+ pos = nodeName->IndexOf('/');
+ if (pos != -1) {
+ subject = nodeName->Substring(pos + 1);
+ if (String::IsNullOrEmpty(subject))
+ subject = nullptr;
+ nodeName = nodeName->Substring(0, pos);
+ }
+}
+
+
+QpidAddress^ QpidAddress::CreateAddress(String^ s, bool isInput) {
+ QpidAddress^ addr = gcnew QpidAddress(s, isInput);
+ return addr;
+}
+
+
+void QpidAddress::ResolveLink(AmqpSession^ amqpSession) {
+
+ AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer();
+ if (asyncSessionp == NULL)
+ throw gcnew ObjectDisposedException("session");
+
+ deleteName = nullptr;
+ isQueue = true;
+
+ try {
+ Session session = sync(*asyncSessionp);
+ std::string n_name = QpidMarshal::ToNative(nodeName);
+ ExchangeBoundResult result = session.exchangeBound(arg::exchange=n_name, arg::queue=n_name);
+
+ bool queueFound = !result.getQueueNotFound();
+ bool exchangeFound = !result.getExchangeNotFound();
+
+ if (isInputChannel) {
+
+ if (queueFound) {
+ linkName = nodeName;
+ if (deleting)
+ deleteName = nodeName;
+ }
+ else if (exchangeFound) {
+ isQueue = false;
+ String^ tmpkey = nullptr;
+ String^ tmpname = nodeName + "_" + Guid::NewGuid().ToString();
+ bool haveSubject = !String::IsNullOrEmpty(subject);
+ FieldTable bindArgs;
+
+ std::string exchangeType = session.exchangeQuery(n_name).getType();
+ if (exchangeType == "topic") {
+ tmpkey = haveSubject ? subject : "#";
+ }
+ else if (exchangeType == "fanout") {
+ tmpkey = tmpname;
+ }
+ else if (exchangeType == "headers") {
+ tmpkey = haveSubject ? subject : "match-all";
+ if (haveSubject)
+ bindArgs.setString("qpid.subject", QpidMarshal::ToNative(subject));
+ bindArgs.setString("x-match", "all");
+ }
+ else if (exchangeType == "xml") {
+ tmpkey = haveSubject ? subject : "";
+ if (haveSubject) {
+ String^ v = "declare variable $qpid.subject external; $qpid.subject = '" +
+ subject + "'";
+ bindArgs.setString("xquery", QpidMarshal::ToNative(v));
+ }
+ else
+ bindArgs.setString("xquery", "true()");
+ }
+ else {
+ tmpkey = haveSubject ? subject : "";
+ }
+
+ std::string qn = QpidMarshal::ToNative(tmpname);
+ session.queueDeclare(arg::queue=qn, arg::autoDelete=true, arg::exclusive=true);
+ bool success = false;
+ try {
+ session.exchangeBind(arg::exchange=n_name, arg::queue=qn,
+ arg::bindingKey=QpidMarshal::ToNative(tmpkey),
+ arg::arguments=bindArgs);
+ bindKey = tmpkey; // remember for later cleanup
+ success = true;
+ }
+ finally {
+ if (!success)
+ session.queueDelete(arg::queue=qn);
+ }
+ linkName = tmpname;
+ deleteName = tmpname;
+ deleting = true;
+ }
+ else if (creating) {
+ // only create "point and shoot" queues for now
+ session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName));
+ // leave unbound
+
+ linkName = nodeName;
+
+ if (deleting)
+ deleteName = nodeName;
+ }
+ else {
+ throw gcnew ArgumentException("AMQP broker node not found: " + nodeName);
+ }
+ }
+ else {
+ // Output channel
+
+ bool oldStyleUri = address->StartsWith("//");
+
+ if (queueFound) {
+ linkName = ""; // default exchange for point and shoot
+ routingKey = nodeName;
+ if (deleting)
+ deleteName = nodeName;
+ }
+ else if (exchangeFound && !oldStyleUri) {
+ isQueue = false;
+ linkName = nodeName;
+ routingKey = subject;
+ }
+ else if (creating) {
+ // only create "point and shoot" queues for now
+ session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName));
+ // leave unbound
+ linkName = "";
+ routingKey = nodeName;
+ if (deleting)
+ deleteName = nodeName;
+ }
+ else {
+ throw gcnew ArgumentException("AMQP broker node not found: " + nodeName);
+ }
+ }
+ }
+ finally {
+ amqpSession->ReturnNativeSession();
+ }
+}
+
+void QpidAddress::CleanupLink(AmqpSession^ amqpSession) {
+ if (deleteName == nullptr)
+ return;
+
+ AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer();
+ if (asyncSessionp == NULL) {
+ // TODO: log it: can't undo tear down actions
+ return;
+ }
+
+ try {
+ Session session = sync(*asyncSessionp);
+ std::string q = QpidMarshal::ToNative(deleteName);
+ if (isInputChannel && !isQueue) {
+ // undo the temp wiring to the topic
+ session.exchangeUnbind(arg::exchange=QpidMarshal::ToNative(nodeName), arg::queue=q,
+ arg::bindingKey=QpidMarshal::ToNative(bindKey));
+ }
+ session.queueDelete(q);
+ }
+ catch (Exception^ e) {
+ // TODO: log it
+ }
+ finally {
+ amqpSession->ReturnNativeSession();
+ }
+}
+
+bool QpidAddress::PolicyApplies(String^ mode) {
+ if (mode->Equals("always"))
+ return true;
+ if (mode->Equals("sender"))
+ return !isInputChannel;
+ if (mode->Equals("receiver"))
+ return isInputChannel;
+ if (mode->Equals("never"))
+ return false;
+
+ throw gcnew ArgumentException(String::Format("Bad address option {0} for {1}", mode, address));
+}
+
+}}} // namespace Apache::Qpid::Interop
Added: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h?rev=1026915&view=auto
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h (added)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h Sun Oct 24 23:29:37 2010
@@ -0,0 +1,89 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+#include "MessageWaiter.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+using namespace System::Runtime::InteropServices;
+
+using namespace qpid::client;
+using namespace std;
+
+
+public ref class QpidAddress
+{
+private:
+ QpidAddress(String^ address, bool isInput);
+
+ // the original Qpid messaging address string, with WCF uri sematics removed, and URL decoded
+ String^ address;
+
+ String^ nodeName;
+ // "qpid.subject"
+ String^ subject;
+ // 0-10 routing key (Output channels only)
+ String^ routingKey;
+
+ String^ linkName;
+ String^ deleteName;
+ String^ bindKey;
+
+ // node type: queue/topic
+ bool isQueue;
+
+ // direction
+ bool isInputChannel;
+
+ bool creating;
+ bool deleting;
+ bool browsing;
+
+ bool PolicyApplies(String^ mode);
+
+internal:
+ static QpidAddress^ CreateAddress(String ^s, bool isInput);
+ void ResolveLink(AmqpSession^ amqpSession);
+ void CleanupLink(AmqpSession^ amqpSession);
+
+ property String^ LinkName {
+ String^ get () { return linkName; }
+ }
+
+ property String^ Subject {
+ String^ get () { return subject; }
+ }
+
+ property String^ RoutingKey {
+ String^ get () { return routingKey; }
+ }
+
+ property bool Browsing {
+ bool get () { return browsing; }
+ }
+
+};
+
+}}} // namespace Apache::Qpid::Interop
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org