You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/12/04 00:11:06 UTC

svn commit: r886978 [1/2] - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp: ./ Commands/ Protocol/ Transport/ Transport/Tcp/ Util/

Author: tabish
Date: Thu Dec  3 23:11:03 2009
New Revision: 886978

URL: http://svn.apache.org/viewvc?rev=886978&view=rev
Log:
Commit a bunch of initial work on porting the Stomp bits plus other useful classes over from NMS.ActiveMQ.  None of this compiles yet.

Added:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseDataStructure.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructure.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Response.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrameStream.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransportFactory.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/IWireFormat.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/LoggingTransport.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/MutexTransport.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Util/MessageDispatchChannel.cs   (with props)

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//
+// Marshalling code for Open Wire Format for BaseCommand
+//
+//
+// NOTE!: This file is autogenerated - do not modify!
+//        if you need to make a change, please see the Groovy scripts in the
+//        activemq-openwire module
+//
+
+using System;
+
+namespace Apache.NMS.Stomp.Commands
+{
+    public abstract class BaseCommand : BaseDataStructure, Command, ICloneable
+    {
+        private int commandId;
+        private bool responseRequired = false;
+
+        public int CommandId
+        {
+            get { return commandId; }
+            set { this.commandId = value; }
+        }
+
+        public override int GetHashCode()
+        {
+            return (CommandId * 37) + GetDataStructureType();
+        }
+
+        public override bool Equals(Object that)
+        {
+            if(that is BaseCommand)
+            {
+                BaseCommand thatCommand = (BaseCommand) that;
+                return this.GetDataStructureType() == thatCommand.GetDataStructureType()
+                    && this.CommandId == thatCommand.CommandId;
+            }
+            return false;
+        }
+
+        public override String ToString()
+        {
+            string answer = GetDataStructureTypeAsString(GetDataStructureType());
+            if(answer.Length == 0)
+            {
+                answer = base.ToString();
+            }
+            return answer + ": id = " + CommandId;
+        }
+
+        public static String GetDataStructureTypeAsString(int type)
+        {
+            String packetTypeStr = "";
+            switch(type)
+            {
+                case ActiveMQMessage.ID_ACTIVEMQMESSAGE:
+                    packetTypeStr = "ACTIVEMQ_MESSAGE";
+                    break;
+                case ActiveMQTextMessage.ID_ACTIVEMQTEXTMESSAGE:
+                    packetTypeStr = "ACTIVEMQ_TEXT_MESSAGE";
+                    break;
+                case ActiveMQObjectMessage.ID_ACTIVEMQOBJECTMESSAGE:
+                    packetTypeStr = "ACTIVEMQ_OBJECT_MESSAGE";
+                    break;
+                case ActiveMQBytesMessage.ID_ACTIVEMQBYTESMESSAGE:
+                    packetTypeStr = "ACTIVEMQ_BYTES_MESSAGE";
+                    break;
+                case ActiveMQStreamMessage.ID_ACTIVEMQSTREAMMESSAGE:
+                    packetTypeStr = "ACTIVEMQ_STREAM_MESSAGE";
+                    break;
+                case ActiveMQMapMessage.ID_ACTIVEMQMAPMESSAGE:
+                    packetTypeStr = "ACTIVEMQ_MAP_MESSAGE";
+                    break;
+                case MessageAck.ID_MESSAGEACK:
+                    packetTypeStr = "ACTIVEMQ_MSG_ACK";
+                    break;
+                case Response.ID_RESPONSE:
+                    packetTypeStr = "RESPONSE";
+                    break;
+                case ConsumerInfo.ID_CONSUMERINFO:
+                    packetTypeStr = "CONSUMER_INFO";
+                    break;
+                case ProducerInfo.ID_PRODUCERINFO:
+                    packetTypeStr = "PRODUCER_INFO";
+                    break;
+                case TransactionInfo.ID_TRANSACTIONINFO:
+                    packetTypeStr = "TRANSACTION_INFO";
+                    break;
+                case BrokerInfo.ID_BROKERINFO:
+                    packetTypeStr = "BROKER_INFO";
+                    break;
+                case ConnectionInfo.ID_CONNECTIONINFO:
+                    packetTypeStr = "CONNECTION_INFO";
+                    break;
+                case SessionInfo.ID_SESSIONINFO:
+                    packetTypeStr = "SESSION_INFO";
+                    break;
+                case RemoveSubscriptionInfo.ID_REMOVESUBSCRIPTIONINFO:
+                    packetTypeStr = "DURABLE_UNSUBSCRIBE";
+                    break;
+                case IntegerResponse.ID_INTEGERRESPONSE:
+                    packetTypeStr = "INT_RESPONSE_RECEIPT_INFO";
+                    break;
+                case WireFormatInfo.ID_WIREFORMATINFO:
+                    packetTypeStr = "WIRE_FORMAT_INFO";
+                    break;
+                case RemoveInfo.ID_REMOVEINFO:
+                    packetTypeStr = "REMOVE_INFO";
+                    break;
+                case KeepAliveInfo.ID_KEEPALIVEINFO:
+                    packetTypeStr = "KEEP_ALIVE";
+                    break;
+            }
+            return packetTypeStr;
+        }
+
+        public virtual Response visit(ICommandVisitor visitor)
+        {
+            throw new ApplicationException("BaseCommand.Visit() not implemented");
+        }
+
+        public virtual bool IsBrokerInfo
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool IsConnectionInfo
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool IsConnectionError
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool IsConsumerInfo
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool IsControlCommand
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool IsDestinationInfo
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool IsMessage
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool IsMessageAck
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool IsMessageDispatch
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool IsProducerAck
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool IsProducerInfo
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool IsRemoveInfo
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool IsRemoveSubscriptionInfo
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool IsResponse
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool IsSessionInfo
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool IsShutdownInfo
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool IsTransactionInfo
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool IsWireFormatInfo
+        {
+            get
+            {
+                return false;
+            }
+        }
+
+        public virtual bool ResponseRequired
+        {
+            get
+            {
+                return responseRequired;
+            }
+            set
+            {
+                responseRequired = value;
+            }
+        }
+
+        public override Object Clone()
+        {
+            // Since we are a derived class use the base's Clone()
+            // to perform the shallow copy. Since it is shallow it
+            // will include our derived class. Since we are derived,
+            // this method is an override.
+            BaseCommand o = (BaseCommand) base.Clone();
+
+            return o;
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseDataStructure.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseDataStructure.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseDataStructure.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseDataStructure.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Stomp.Commands
+{
+
+    /// <summary>
+    /// Base class for all DataStructure implementations
+    /// </summary>
+    public abstract class BaseDataStructure : DataStructure, ICloneable
+    {
+        public virtual byte GetDataStructureType()
+        {
+            return 0;
+        }
+
+        public virtual bool IsMarshallAware()
+        {
+            return false;
+        }
+
+        public virtual void BeforeMarshall(OpenWireFormat wireFormat)
+        {
+        }
+
+        public virtual void AfterMarshall(OpenWireFormat wireFormat)
+        {
+        }
+
+        public virtual void BeforeUnmarshall(OpenWireFormat wireFormat)
+        {
+        }
+
+        public virtual void AfterUnmarshall(OpenWireFormat wireFormat)
+        {
+        }
+
+        public virtual void SetMarshalledForm(OpenWireFormat wireFormat, byte[] data)
+        {
+        }
+
+        public virtual byte[] GetMarshalledForm(OpenWireFormat wireFormat)
+        {
+            return null;
+        }
+
+        // Helper methods
+        public int HashCode(object value)
+        {
+            if(value != null)
+            {
+                return value.GetHashCode();
+            }
+            else
+            {
+                return -1;
+            }
+        }
+
+        public virtual Object Clone()
+        {
+            // Since we are the lowest level base class, do a
+            // shallow copy which will include the derived classes.
+            // From here we would do deep cloning of other objects
+            // if we had any.
+            return this.MemberwiseClone();
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseDataStructure.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Stomp.Commands
+{
+    /// <summary>
+    /// An Stomp command
+    /// </summary>
+    public interface Command : ICloneable
+    {
+        int CommandId
+        {
+            get;
+            set;
+        }
+
+        bool ResponseRequired
+        {
+            get;
+            set;
+        }
+
+        bool IsConnectionInfo
+        {
+            get;
+        }
+
+        bool IsConnectedCommand
+        {
+            get;
+        }
+
+        bool IsErrorCommand
+        {
+            get;
+        }
+
+        bool IsDestinationInfo
+        {
+            get;
+        }
+
+        bool IsMessage
+        {
+            get;
+        }
+
+        bool IsMessageAck
+        {
+            get;
+        }
+
+        bool IsMessageDispatch
+        {
+            get;
+        }
+
+        bool IsRemoveInfo
+        {
+            get;
+        }
+
+        bool IsRemoveSubscriptionInfo
+        {
+            get;
+        }
+
+        bool IsResponse
+        {
+            get;
+        }
+
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructure.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructure.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructure.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructure.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Stomp.Commands
+{
+
+    /// <summary>
+    /// An OpenWire command
+    /// </summary>
+    public interface DataStructure : ICloneable
+    {
+        byte GetDataStructureType();
+        bool IsMarshallAware();
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructure.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructure.cs
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Response.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Response.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Response.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Response.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+
+namespace Apache.NMS.Stomp.Commands
+{
+
+    public class Response : BaseCommand
+    {
+        int correlationId;
+
+        ///
+        /// <summery>
+        ///  Returns a string containing the information for this DataStructure
+        ///  such as its type and value of its elements.
+        /// </summery>
+        ///
+        public override string ToString()
+        {
+            return GetType().Name + "[" +
+                "CorrelationId=" + CorrelationId +
+                "]";
+        }
+
+        public int CorrelationId
+        {
+            get { return correlationId; }
+            set { this.correlationId = value; }
+        }
+
+        ///
+        /// <summery>
+        ///  Return an answer of true to the isResponse() query.
+        /// </summery>
+        ///
+        public override bool IsResponse
+        {
+            get
+            {
+                return true;
+            }
+        }
+
+    };
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Response.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Transport;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.Policies;
+
+namespace Apache.NMS.Stomp
+{
+    /// <summary>
+    /// Represents a connection with a message broker
+    /// </summary>
+    public class ConnectionFactory : IConnectionFactory
+    {
+        public const string DEFAULT_BROKER_URL = "tcp://localhost:61613";
+        public const string ENV_BROKER_URL = "STOMP_BROKER_URL";
+
+        private static event ExceptionListener onException;
+        private Uri brokerUri;
+        private string connectionUserName;
+        private string connectionPassword;
+        private string clientId;
+
+        private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+        private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
+
+        static ConnectionFactory()
+        {
+            TransportFactory.OnException += ConnectionFactory.ExceptionHandler;
+        }
+
+        public static string GetDefaultBrokerUrl()
+        {
+#if (PocketPC||NETCF||NETCF_2_0)
+            return DEFAULT_BROKER_URL;
+#else
+            return Environment.GetEnvironmentVariable(ENV_BROKER_URL) ?? DEFAULT_BROKER_URL;
+#endif
+        }
+
+        public ConnectionFactory()
+            : this(GetDefaultBrokerUrl())
+        {
+        }
+
+        public ConnectionFactory(string brokerUri)
+            : this(brokerUri, null)
+        {
+        }
+
+        public ConnectionFactory(string brokerUri, string clientID)
+            : this(new Uri(brokerUri), clientID)
+        {
+        }
+
+        public ConnectionFactory(Uri brokerUri)
+            : this(brokerUri, null)
+        {
+        }
+
+        public ConnectionFactory(Uri brokerUri, string clientID)
+        {
+            this.brokerUri = brokerUri;
+            this.clientId = clientID;
+        }
+
+        public IConnection CreateConnection()
+        {
+            return CreateConnection(connectionUserName, connectionPassword);
+        }
+
+        public IConnection CreateConnection(string userName, string password)
+        {
+            // Strip off the activemq prefix, if it exists.
+            Uri uri = new Uri(URISupport.stripPrefix(brokerUri.OriginalString, "activemq:"));
+
+            Tracer.InfoFormat("Connecting to: {0}", uri.ToString());
+
+            ConnectionInfo info = CreateConnectionInfo(userName, password);
+            ITransport transport = TransportFactory.CreateTransport(uri);
+            Connection connection = new Connection(uri, transport, info);
+
+            // Set the Factory level configuration to the Connection, this can be overriden by
+            // the params on the Connection URI so we do this before applying the params.
+            connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
+            connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
+
+            // Set properties on connection using parameters prefixed with "connection."
+            // Since this could be a composite Uri, assume the connection-specific parameters
+            // are associated with the outer-most specification of the composite Uri. What's nice
+            // is that this works with simple Uri as well.
+            URISupport.CompositeData c = URISupport.parseComposite(uri);
+            URISupport.SetProperties(connection, c.Parameters, "connection.");
+
+            connection.ITransport.Start();
+            return connection;
+        }
+
+        // Properties
+
+        /// <summary>
+        /// Get/or set the broker Uri.
+        /// </summary>
+        public Uri BrokerUri
+        {
+            get { return brokerUri; }
+            set { brokerUri = value; }
+        }
+
+        public string UserName
+        {
+            get { return connectionUserName; }
+            set { connectionUserName = value; }
+        }
+
+        public string Password
+        {
+            get { return connectionPassword; }
+            set { connectionPassword = value; }
+        }
+
+        public string ClientId
+        {
+            get { return clientId; }
+            set { clientId = value; }
+        }
+
+        public IRedeliveryPolicy RedeliveryPolicy
+        {
+            get { return this.redeliveryPolicy; }
+            set
+            {
+                if(value != null)
+                {
+                    this.redeliveryPolicy = value;
+                }
+            }
+        }
+
+        public event ExceptionListener OnException
+        {
+            add { onException += value; }
+            remove
+            {
+                if(onException != null)
+                {
+                    onException -= value;
+                }
+            }
+        }
+
+        protected virtual ConnectionInfo CreateConnectionInfo(string userName, string password)
+        {
+            ConnectionInfo answer = new ConnectionInfo();
+            ConnectionId connectionId = new ConnectionId();
+            connectionId.Value = CreateNewGuid();
+
+            answer.ConnectionId = connectionId;
+            answer.UserName = userName;
+            answer.Password = password;
+            answer.ClientId = clientId ?? CreateNewGuid();
+
+            return answer;
+        }
+
+        protected static string CreateNewGuid()
+        {
+            return Guid.NewGuid().ToString();
+        }
+
+        protected static void ExceptionHandler(Exception ex)
+        {
+            if(ConnectionFactory.onException != null)
+            {
+                ConnectionFactory.onException(ex);
+            }
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrameStream.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrameStream.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrameStream.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrameStream.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.Reflection;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Transport;
+using Apache.NMS;
+using System;
+using System.Collections;
+using System.IO;
+using System.Text;
+
+namespace Apache.NMS.Stomp.Protocol
+{
+    /// <summary>
+    /// A Stream for writing a <a href="http://stomp.codehaus.org/">STOMP</a> Frame
+    /// </summary>
+    public class StompFrameStream
+    {
+        /// Used to terminate a header line or end of a headers section of the Frame.
+        public const String NEWLINE = "\n";
+        /// Used to seperate the Key / Value pairing in Frame Headers
+        public const String SEPARATOR = ":";
+        /// Used to mark the End of the Frame.
+        public const byte FRAME_TERMINUS = (byte) 0;
+
+        private StringBuilder builder = new StringBuilder();
+        private BinaryWriter ds;
+        private byte[] content;
+        private int contentLength = -1;
+        private Encoding encoding;
+
+        public StompFrameStream(BinaryWriter ds, Encoding encoding)
+        {
+            this.ds = ds;
+            this.encoding = encoding;
+        }
+
+        public byte[] Content
+        {
+            get { return content; }
+            set { content = value; }
+        }
+
+        public int ContentLength
+        {
+            get { return contentLength; }
+            set
+            {
+                contentLength = value;
+                WriteHeader("content-length", contentLength);
+            }
+        }
+
+        public void WriteCommand(Command command, String name)
+        {
+            WriteCommand(command, name, false);
+        }
+
+        public void WriteCommand(Command command, String name, bool ignoreErrors)
+        {
+            builder.Append(name);
+            builder.Append(NEWLINE);
+            if(command.ResponseRequired)
+            {
+                if(ignoreErrors)
+                {
+                    WriteHeader("receipt", "ignore:" + command.CommandId);
+                }
+                else
+                {
+                    WriteHeader("receipt", command.CommandId);
+                }
+            }
+        }
+
+        public void WriteHeader(String name, Object value)
+        {
+            if (value != null)
+            {
+                builder.Append(name);
+                builder.Append(SEPARATOR);
+                builder.Append(value);
+                builder.Append(NEWLINE);
+            }
+        }
+
+        public void WriteHeader(String name, bool value)
+        {
+            if (value)
+            {
+                builder.Append(name);
+                builder.Append(SEPARATOR);
+                builder.Append("true");
+                builder.Append(NEWLINE);
+            }
+        }
+
+        public void Flush()
+        {
+            builder.Append(NEWLINE);
+            ds.Write(encoding.GetBytes(builder.ToString()));
+
+            if (content != null)
+            {
+                ds.Write(content);
+            }
+
+            // Always write a terminating NULL byte to end the content frame.
+            ds.Write(FRAME_TERMINUS);
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrameStream.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Text;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS;
+
+namespace Apache.NMS.Stomp.Protocol
+{
+    /// <summary>
+    /// Some <a href="http://stomp.codehaus.org/">STOMP</a> protocol conversion helper methods.
+    /// </summary>
+    public class StompHelper
+    {
+        private static int ParseInt(string text)
+        {
+            StringBuilder sbtext = new StringBuilder();
+
+            for(int idx = 0; idx < text.Length; idx++)
+            {
+                if(char.IsNumber(text, idx))
+                {
+                    sbtext.Append(text[idx]);
+                }
+                else
+                {
+                    break;
+                }
+            }
+
+            if(sbtext.Length > 0)
+            {
+                return Int32.Parse(sbtext.ToString());
+            }
+
+            return 0;
+        }
+
+        public static ActiveMQDestination ToDestination(string text)
+        {
+            if(text == null)
+            {
+                return null;
+            }
+
+            int type = ActiveMQDestination.ACTIVEMQ_QUEUE;
+            string lowertext = text.ToLower();
+            if(lowertext.StartsWith("/queue/"))
+            {
+                text = text.Substring("/queue/".Length);
+            }
+            else if(lowertext.StartsWith("/topic/"))
+            {
+                text = text.Substring("/topic/".Length);
+                type = ActiveMQDestination.ACTIVEMQ_TOPIC;
+            }
+            else if(lowertext.StartsWith("/temp-topic/"))
+            {
+                text = text.Substring("/temp-topic/".Length);
+                type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC;
+            }
+            else if(lowertext.StartsWith("/temp-queue/"))
+            {
+                text = text.Substring("/temp-queue/".Length);
+                type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE;
+            }
+            else if(lowertext.StartsWith("/remote-temp-topic/"))
+            {
+                type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC;
+            }
+            else if(lowertext.StartsWith("/remote-temp-queue/"))
+            {
+                type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE;
+            }
+
+            return ActiveMQDestination.CreateDestination(type, text);
+        }
+
+        public static string ToStomp(ActiveMQDestination destination)
+        {
+            if(destination == null)
+            {
+                return null;
+            }
+
+            switch (destination.DestinationType)
+            {
+                case DestinationType.Topic:
+                    return "/topic/" + destination.PhysicalName;
+
+                case DestinationType.TemporaryTopic:
+                    if (destination.PhysicalName.ToLower().StartsWith("/remote-temp-topic/"))
+                    {
+                        return destination.PhysicalName;
+                    }
+
+                    return "/temp-topic/" + destination.PhysicalName;
+
+                case DestinationType.TemporaryQueue:
+                    if (destination.PhysicalName.ToLower().StartsWith("/remote-temp-queue/"))
+                    {
+                        return destination.PhysicalName;
+                    }
+
+                    return "/temp-queue/" + destination.PhysicalName;
+
+                default:
+                    return "/queue/" + destination.PhysicalName;
+            }
+        }
+
+        public static string ToStomp(ConsumerId id)
+        {
+            return id.ConnectionId + ":" + id.SessionId + ":" + id.Value;
+        }
+
+        public static ConsumerId ToConsumerId(string text)
+        {
+            if(text == null)
+            {
+                return null;
+            }
+
+            ConsumerId answer = new ConsumerId();
+            int idx = text.LastIndexOf(':');
+            if (idx >= 0)
+            {
+                try
+                {
+                    answer.Value = ParseInt(text.Substring(idx + 1));
+                    text = text.Substring(0, idx);
+                    idx = text.LastIndexOf(':');
+                    if (idx >= 0)
+                    {
+                        try
+                        {
+                            answer.SessionId = ParseInt(text.Substring(idx + 1));
+                            text = text.Substring(0, idx);
+                        }
+                        catch(Exception ex)
+                        {
+                            Tracer.Debug(ex.Message);
+                        }
+                    }
+                }
+                catch(Exception ex)
+                {
+                    Tracer.Debug(ex.Message);
+                }
+            }
+            answer.ConnectionId = text;
+            return answer;
+        }
+
+        public static string ToStomp(ProducerId id)
+        {
+            StringBuilder producerBuilder = new StringBuilder();
+
+            producerBuilder.Append(id.ConnectionId);
+            producerBuilder.Append(":");
+            producerBuilder.Append(id.SessionId);
+            producerBuilder.Append(":");
+            producerBuilder.Append(id.Value);
+
+            return producerBuilder.ToString();
+        }
+
+        public static ProducerId ToProducerId(string text)
+        {
+            if(text == null)
+            {
+                return null;
+            }
+
+            ProducerId answer = new ProducerId();
+            int idx = text.LastIndexOf(':');
+            if(idx >= 0)
+            {
+                try
+                {
+                    answer.Value = ParseInt(text.Substring(idx + 1));
+                    text = text.Substring(0, idx);
+                    idx = text.LastIndexOf(':');
+                    if (idx >= 0)
+                    {
+                        answer.SessionId = ParseInt(text.Substring(idx + 1));
+                        text = text.Substring(0, idx);
+                    }
+                }
+                catch(Exception ex)
+                {
+                    Tracer.Debug(ex.Message);
+                }
+            }
+            answer.ConnectionId = text;
+            return answer;
+        }
+
+        public static string ToStomp(MessageId id)
+        {
+            StringBuilder messageBuilder = new StringBuilder();
+
+            messageBuilder.Append(ToStomp(id.ProducerId));
+            messageBuilder.Append(":");
+            messageBuilder.Append(id.ProducerSequenceId);
+
+            return messageBuilder.ToString();
+        }
+
+        public static MessageId ToMessageId(string text)
+        {
+            if(text == null)
+            {
+                return null;
+            }
+
+            MessageId answer = new MessageId();
+            int idx = text.LastIndexOf(':');
+            if (idx >= 0)
+            {
+                try
+                {
+                    answer.ProducerSequenceId = ParseInt(text.Substring(idx + 1));
+                    text = text.Substring(0, idx);
+                }
+                catch(Exception ex)
+                {
+                    Tracer.Debug(ex.Message);
+                }
+            }
+            answer.ProducerId = ToProducerId(text);
+            return answer;
+        }
+
+        public static string ToStomp(TransactionId id)
+        {
+            if(id is LocalTransactionId)
+            {
+                return ToStomp(id as LocalTransactionId);
+            }
+
+            return id.ToString();
+        }
+
+        public static string ToStomp(LocalTransactionId transactionId)
+        {
+            return transactionId.ConnectionId.Value + ":" + transactionId.Value;
+        }
+
+        public static bool ToBool(string text, bool defaultValue)
+        {
+            if(text == null)
+            {
+                return defaultValue;
+            }
+
+            return (0 == string.Compare("true", text, true));
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Transport;
+using Apache.NMS;
+using System;
+using System.Collections;
+using System.IO;
+using System.Text;
+
+namespace Apache.NMS.Stomp.Protocol
+{
+    /// <summary>
+    /// Implements the <a href="http://stomp.codehaus.org/">STOMP</a> protocol.
+    /// </summary>
+    public class StompWireFormat : IWireFormat
+    {
+        private Encoding encoding = new UTF8Encoding();
+        private ITransport transport;
+        private IDictionary consumers = Hashtable.Synchronized(new Hashtable());
+
+        public StompWireFormat()
+        {
+        }
+
+        public ITransport Transport {
+            get { return transport; }
+            set { transport = value; }
+        }
+
+        public int Version {
+            get { return 1; }
+        }
+
+        public void Marshal(Object o, BinaryWriter binaryWriter)
+        {
+            Tracer.Debug(">>>> " + o);
+            StompFrameStream ds = new StompFrameStream(binaryWriter, encoding);
+
+            if (o is ConnectionInfo)
+            {
+                WriteConnectionInfo((ConnectionInfo) o, ds);
+            }
+            else if (o is ActiveMQMessage)
+            {
+                WriteMessage((ActiveMQMessage) o, ds);
+            }
+            else if (o is ConsumerInfo)
+            {
+                WriteConsumerInfo((ConsumerInfo) o, ds);
+            }
+            else if (o is MessageAck)
+            {
+                WriteMessageAck((MessageAck) o, ds);
+            }
+            else if (o is TransactionInfo)
+            {
+                WriteTransactionInfo((TransactionInfo) o, ds);
+            }
+            else if (o is ShutdownInfo)
+            {
+                WriteShutdownInfo((ShutdownInfo) o, ds);
+            }
+            else if (o is RemoveInfo)
+            {
+                WriteRemoveInfo((RemoveInfo) o, ds);
+            }
+            else if (o is Command)
+            {
+                Command command = o as Command;
+                if (command.ResponseRequired)
+                {
+                    Response response = new Response();
+                    response.CorrelationId = command.CommandId;
+                    SendCommand(response);
+                    Tracer.Debug("#### Autorespond to command: " + o.GetType());
+                }
+            }
+            else
+            {
+                Tracer.Debug("#### Ignored command: " + o.GetType());
+            }
+        }
+
+
+        internal String ReadLine(BinaryReader dis)
+        {
+            MemoryStream ms = new MemoryStream();
+            while (true)
+            {
+                int nextChar = dis.Read();
+                if (nextChar < 0)
+                {
+                    throw new IOException("Peer closed the stream.");
+                }
+                if( nextChar == 10 )
+                {
+                    break;
+                }
+                ms.WriteByte((byte)nextChar);
+            }
+            byte[] data = ms.ToArray();
+            return encoding.GetString(data, 0, data.Length);
+        }
+
+        public Object Unmarshal(BinaryReader dis)
+        {
+            string command;
+            do {
+                command = ReadLine(dis);
+            }
+            while (command == "");
+
+            Tracer.Debug("<<<< command: " + command);
+
+            IDictionary headers = new Hashtable();
+            string line;
+            while ((line = ReadLine(dis)) != "")
+            {
+                int idx = line.IndexOf(':');
+                if (idx > 0)
+                {
+                    string key = line.Substring(0, idx);
+                    string value = line.Substring(idx + 1);
+                    headers[key] = value;
+
+                    Tracer.Debug("<<<< header: " + key + " = " + value);
+                }
+                else
+                {
+                    // lets ignore this bad header!
+                }
+            }
+            byte[] content = null;
+            string length = ToString(headers["content-length"]);
+            if (length != null)
+            {
+                int size = Int32.Parse(length);
+                content = dis.ReadBytes(size);
+                // Read the terminating NULL byte for this frame.
+                int nullByte = dis.Read();
+                if(nullByte != 0)
+                {
+                    Tracer.Debug("<<<< error reading frame null byte.");
+                }
+            }
+            else
+            {
+                MemoryStream ms = new MemoryStream();
+                int nextChar;
+                while((nextChar = dis.Read()) != 0)
+                {
+                    if( nextChar < 0 )
+                    {
+                        // EOF ??
+                        break;
+                    }
+                    ms.WriteByte((byte)nextChar);
+                }
+                content = ms.ToArray();
+            }
+            Object answer = CreateCommand(command, headers, content);
+            Tracer.Debug("<<<< received: " + answer);
+            return answer;
+        }
+
+        protected virtual Object CreateCommand(string command, IDictionary headers, byte[] content)
+        {
+            if(command == "RECEIPT" || command == "CONNECTED")
+            {
+                string text = RemoveHeader(headers, "receipt-id");
+                if(text != null)
+                {
+                    Response answer = new Response();
+                    if(text.StartsWith("ignore:"))
+                    {
+                        text = text.Substring("ignore:".Length);
+                    }
+
+                    answer.CorrelationId = Int32.Parse(text);
+                    return answer;
+                }
+                else if(command == "CONNECTED")
+                {
+                    text = RemoveHeader(headers, "response-id");
+                    if (text != null)
+                    {
+                        Response answer = new Response();
+                        answer.CorrelationId = Int32.Parse(text);
+                        return answer;
+                    }
+                }
+            }
+            else if(command == "ERROR")
+            {
+                string text = RemoveHeader(headers, "receipt-id");
+
+                if(text != null && text.StartsWith("ignore:"))
+                {
+                    Response answer = new Response();
+                    answer.CorrelationId = Int32.Parse(text.Substring("ignore:".Length));
+                    return answer;
+                }
+                else
+                {
+                    ExceptionResponse answer = new ExceptionResponse();
+                    if(text != null)
+                    {
+                        answer.CorrelationId = Int32.Parse(text);
+                    }
+
+                    BrokerError error = new BrokerError();
+                    error.Message = RemoveHeader(headers, "message");
+                    error.ExceptionClass = RemoveHeader(headers, "exceptionClass");
+                    // TODO is this the right header?
+                    answer.Exception = error;
+                    return answer;
+                }
+            }
+            else if (command == "MESSAGE")
+            {
+                return ReadMessage(command, headers, content);
+            }
+            Tracer.Error("Unknown command: " + command + " headers: " + headers);
+            return null;
+        }
+
+        protected virtual Command ReadMessage(string command, IDictionary headers, byte[] content)
+        {
+            ActiveMQMessage message = null;
+            if (headers.Contains("content-length"))
+            {
+                message = new ActiveMQBytesMessage();
+                message.Content = content;
+            }
+            else
+            {
+                message = new ActiveMQTextMessage(encoding.GetString(content, 0, content.Length));
+            }
+
+            // TODO now lets set the various headers
+
+            message.Type = RemoveHeader(headers, "type");
+            message.Destination = StompHelper.ToDestination(RemoveHeader(headers, "destination"));
+            message.ReplyTo = StompHelper.ToDestination(RemoveHeader(headers, "reply-to"));
+            message.TargetConsumerId = StompHelper.ToConsumerId(RemoveHeader(headers, "subscription"));
+            message.CorrelationId = RemoveHeader(headers, "correlation-id");
+            message.MessageId = StompHelper.ToMessageId(RemoveHeader(headers, "message-id"));
+            message.Persistent = StompHelper.ToBool(RemoveHeader(headers, "persistent"), true);
+
+            string header = RemoveHeader(headers, "priority");
+            if (header != null) message.Priority = Byte.Parse(header);
+
+            header = RemoveHeader(headers, "timestamp");
+            if (header != null) message.Timestamp = Int64.Parse(header);
+
+            header = RemoveHeader(headers, "expires");
+            if (header != null) message.Expiration = Int64.Parse(header);
+
+            // now lets add the generic headers
+            foreach (string key in headers.Keys)
+            {
+                Object value = headers[key];
+                if (value != null)
+                {
+                    // lets coerce some standard header extensions
+                    if (key == "NMSXGroupSeq")
+                    {
+                        value = Int32.Parse(value.ToString());
+                    }
+                }
+                message.Properties[key] = value;
+            }
+            MessageDispatch dispatch = new MessageDispatch();
+            dispatch.Message = message;
+            dispatch.ConsumerId = message.TargetConsumerId;
+            dispatch.Destination = message.Destination;
+            return dispatch;
+        }
+
+        protected virtual void WriteConnectionInfo(ConnectionInfo command, StompFrameStream ss)
+        {
+            // lets force a receipt
+            command.ResponseRequired = true;
+
+            ss.WriteCommand(command, "CONNECT");
+            ss.WriteHeader("client-id", command.ClientId);
+            ss.WriteHeader("login", command.UserName);
+            ss.WriteHeader("passcode", command.Password);
+
+            if (command.ResponseRequired)
+            {
+                ss.WriteHeader("request-id", command.CommandId);
+            }
+
+            ss.Flush();
+        }
+
+        protected virtual void WriteShutdownInfo(ShutdownInfo command, StompFrameStream ss)
+        {
+            ss.WriteCommand(command, "DISCONNECT");
+            System.Diagnostics.Debug.Assert(!command.ResponseRequired);
+            ss.Flush();
+        }
+
+        protected virtual void WriteConsumerInfo(ConsumerInfo command, StompFrameStream ss)
+        {
+            ss.WriteCommand(command, "SUBSCRIBE");
+            ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
+            ss.WriteHeader("id", StompHelper.ToStomp(command.ConsumerId));
+            ss.WriteHeader("durable-subscriber-name", command.SubscriptionName);
+            ss.WriteHeader("selector", command.Selector);
+            if ( command.NoLocal )
+                ss.WriteHeader("no-local", command.NoLocal);
+            ss.WriteHeader("ack", "client");
+
+            // ActiveMQ extensions to STOMP
+            ss.WriteHeader("activemq.dispatchAsync", command.DispatchAsync);
+            if ( command.Exclusive )
+                ss.WriteHeader("activemq.exclusive", command.Exclusive);
+
+            if( command.SubscriptionName != null )
+            {
+                ss.WriteHeader("activemq.subscriptionName", command.SubscriptionName);
+                // For an older 4.0 broker we need to set this header so they get the
+                // subscription as wel..
+                ss.WriteHeader("activemq.subcriptionName", command.SubscriptionName);
+            }
+
+            ss.WriteHeader("activemq.maximumPendingMessageLimit", command.MaximumPendingMessageLimit);
+            ss.WriteHeader("activemq.prefetchSize", command.PrefetchSize);
+            ss.WriteHeader("activemq.priority", command.Priority);
+            if ( command.Retroactive )
+                ss.WriteHeader("activemq.retroactive", command.Retroactive);
+
+            consumers[command.ConsumerId] = command.ConsumerId;
+            ss.Flush();
+        }
+
+        protected virtual void WriteRemoveInfo(RemoveInfo command, StompFrameStream ss)
+        {
+            object id = command.ObjectId;
+
+            if (id is ConsumerId)
+            {
+                ConsumerId consumerId = id as ConsumerId;
+                ss.WriteCommand(command, "UNSUBSCRIBE");
+                ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
+                ss.Flush();
+                consumers.Remove(consumerId);
+            }
+            else if (id is SessionId)
+            {
+                // When a session is removed, it needs to remove it's consumers too.
+                // Find all the consumer that were part of the session.
+                SessionId sessionId = (SessionId) id;
+                ArrayList matches = new ArrayList();
+                foreach (DictionaryEntry entry in consumers)
+                {
+                    ConsumerId t = (ConsumerId) entry.Key;
+                    if( sessionId.ConnectionId==t.ConnectionId && sessionId.Value==t.SessionId )
+                    {
+                        matches.Add(t);
+                    }
+                }
+
+                bool unsubscribedConsumer = false;
+
+                // Un-subscribe them.
+                foreach (ConsumerId consumerId in matches)
+                {
+                    ss.WriteCommand(command, "UNSUBSCRIBE");
+                    ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
+                    ss.Flush();
+                    consumers.Remove(consumerId);
+                    unsubscribedConsumer = true;
+                }
+
+                if(!unsubscribedConsumer && command.ResponseRequired)
+                {
+                    ss.WriteCommand(command, "UNSUBSCRIBE", true);
+                    ss.WriteHeader("id", sessionId);
+                    ss.Flush();
+                }
+            }
+            else if(id is ProducerId)
+            {
+                if(command.ResponseRequired)
+                {
+                    ss.WriteCommand(command, "UNSUBSCRIBE", true);
+                    ss.WriteHeader("id", id);
+                    ss.Flush();
+                }
+            }
+            else if(id is ConnectionId)
+            {
+                if(command.ResponseRequired)
+                {
+                    ss.WriteCommand(command, "UNSUBSCRIBE", true);
+                    ss.WriteHeader("id", id);
+                    ss.Flush();
+                }
+            }
+        }
+
+
+        protected virtual void WriteTransactionInfo(TransactionInfo command, StompFrameStream ss)
+        {
+            TransactionId id = command.TransactionId;
+            if (id is LocalTransactionId)
+            {
+                string type = "BEGIN";
+                TransactionType transactionType = (TransactionType) command.Type;
+                switch (transactionType)
+                {
+                    case TransactionType.CommitOnePhase:
+                        command.ResponseRequired = true;
+                        type = "COMMIT";
+                        break;
+                    case TransactionType.Rollback:
+                        command.ResponseRequired = true;
+                        type = "ABORT";
+                        break;
+                }
+
+                Tracer.Debug(">>> For transaction type: " + transactionType + " we are using command type: " + type);
+                ss.WriteCommand(command, type);
+                ss.WriteHeader("transaction", StompHelper.ToStomp(id));
+                ss.Flush();
+            }
+        }
+
+        protected virtual void WriteMessage(ActiveMQMessage command, StompFrameStream ss)
+        {
+            ss.WriteCommand(command, "SEND");
+            ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
+            if (command.ReplyTo != null)
+                ss.WriteHeader("reply-to", StompHelper.ToStomp(command.ReplyTo));
+            if (command.CorrelationId != null )
+                ss.WriteHeader("correlation-id", command.CorrelationId);
+            if (command.Expiration != 0)
+                ss.WriteHeader("expires", command.Expiration);
+            if (command.Priority != 4)
+                ss.WriteHeader("priority", command.Priority);
+            if (command.Type != null)
+                ss.WriteHeader("type", command.Type);
+            if (command.TransactionId!=null)
+                ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
+
+            ss.WriteHeader("persistent", command.Persistent);
+
+            // lets force the content to be marshalled
+
+            command.BeforeMarshall(null);
+            if (command is ActiveMQTextMessage)
+            {
+                ActiveMQTextMessage textMessage = command as ActiveMQTextMessage;
+                ss.Content = encoding.GetBytes(textMessage.Text);
+            }
+            else
+            {
+                ss.Content = command.Content;
+                if(null != command.Content)
+                {
+                    ss.ContentLength = command.Content.Length;
+                }
+                else
+                {
+                    ss.ContentLength = 0;
+                }
+            }
+
+            IPrimitiveMap map = command.Properties;
+            foreach (string key in map.Keys)
+            {
+                ss.WriteHeader(key, map[key]);
+            }
+            ss.Flush();
+        }
+
+        protected virtual void WriteMessageAck(MessageAck command, StompFrameStream ss)
+        {
+            ss.WriteCommand(command, "ACK", true);
+
+            // TODO handle bulk ACKs?
+            ss.WriteHeader("message-id", StompHelper.ToStomp(command.LastMessageId));
+            if(command.TransactionId != null)
+            {
+                ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
+            }
+
+            ss.Flush();
+        }
+
+        protected virtual void SendCommand(Command command)
+        {
+            if (transport == null)
+            {
+                Tracer.Fatal("No transport configured so cannot return command: " + command);
+            }
+            else
+            {
+                transport.Command(transport, command);
+            }
+        }
+
+        protected virtual string RemoveHeader(IDictionary headers, string name)
+        {
+            object value = headers[name];
+            if (value == null)
+            {
+                return null;
+            }
+            else
+            {
+                headers.Remove(name);
+                return value.ToString();
+            }
+        }
+
+
+        protected virtual string ToString(object value)
+        {
+            if (value != null)
+            {
+                return value.ToString();
+            }
+            else
+            {
+                return null;
+            }
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS.Stomp.Commands;
+using System;
+using System.Threading;
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp.Transport
+{
+	/// <summary>
+	/// Handles asynchronous responses
+	/// </summary>
+	public class FutureResponse
+	{
+		private static TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
+		public TimeSpan ResponseTimeout
+		{
+			get { return maxWait; }
+			set { maxWait = value; }
+		}
+
+		private readonly CountDownLatch latch = new CountDownLatch(1);
+		private Response response;
+
+		public WaitHandle AsyncWaitHandle
+		{
+			get { return latch.AsyncWaitHandle; }
+		}
+
+		public Response Response
+		{
+			// Blocks the caller until a value has been set
+			get
+			{
+				bool waitForResponse = false;
+
+				lock(latch)
+				{
+					if(null == response)
+					{
+						waitForResponse = true;
+					}
+				}
+
+				if(waitForResponse)
+				{
+					try
+					{
+						if(!latch.await(maxWait))
+						{
+							// TODO: Throw timeout exception?
+						}
+					}
+					catch (Exception e)
+					{
+						Tracer.Error("Caught while waiting on monitor: " + e);
+					}
+				}
+
+				lock(latch)
+				{
+					return response;
+				}
+			}
+
+			set
+			{
+				lock(latch)
+				{
+					response = value;
+				}
+
+				latch.countDown();
+			}
+		}
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.Stomp.Commands;
+
+namespace Apache.NMS.Stomp.Transport
+{
+	public delegate void CommandHandler(ITransport sender, Command command);
+	public delegate void ExceptionHandler(ITransport sender, Exception command);
+	public delegate void InterruptedHandler(ITransport sender);
+	public delegate void ResumedHandler(ITransport sender);
+
+	/// <summary>
+	/// Represents the logical networking transport layer.  Transports implment the low
+    /// level protocol specific portion of the Communication between the Client and a Broker
+    /// such as TCP, UDP, etc.  Transports make use of WireFormat objects to handle translateing
+    /// the cononical OpenWire Commands used in this client into binary wire level packets that
+    /// can be sent to the Broker or Service that the Transport connects to.
+	/// </summary>
+	public interface ITransport : IStartable, IDisposable, IStoppable
+	{
+        /// <summary>
+        /// Sends a Command object on the Wire but does not wait for any response from the
+        /// receiver before returning.  
+        /// </summary>
+        /// <param name="command">
+        /// A <see cref="Command"/>
+        /// </param>
+		void Oneway(Command command);
+
+        /// <summary>
+        /// Sends a Command object which requires a response from the Broker but does not
+        /// wait for the response, instead a FutureResponse object is returned that the
+        /// caller can use to wait on the Broker's response.
+        /// </summary>
+        /// <param name="command">
+        /// A <see cref="Command"/>
+        /// </param>
+        /// <returns>
+        /// A <see cref="FutureResponse"/>
+        /// </returns>
+		FutureResponse AsyncRequest(Command command);
+
+        /// <summary>
+        /// Sends a Command to the Broker and waits for a Response to that Command before
+        /// returning, this version waits indefinitely for a response.
+        /// </summary>
+        /// <param name="command">
+        /// A <see cref="Command"/>
+        /// </param>
+        /// <returns>
+        /// A <see cref="Response"/>
+        /// </returns>
+		Response Request(Command command);
+
+        /// <summary>
+        /// Sends a Command to the Broker and waits for the given TimeSpan to expire for a
+        /// response before returning.  
+        /// </summary>
+        /// <param name="command">
+        /// A <see cref="Command"/>
+        /// </param>
+        /// <param name="timeout">
+        /// A <see cref="TimeSpan"/>
+        /// </param>
+        /// <returns>
+        /// A <see cref="Response"/>
+        /// </returns>
+		Response Request(Command command, TimeSpan timeout);
+
+        /// <summary>
+        /// Allows a caller to find a specific type of Transport in the Chain of
+        /// Transports that is created.  This allows a caller to find a specific
+        /// object in the Transport chain and set or get properties on that specific
+        /// instance.  If the requested type isn't in the chain than Null is returned.
+        /// </summary>
+        /// <param name="type">
+        /// A <see cref="Type"/>
+        /// </param>
+        /// <returns>
+        /// A <see cref="System.Object"/>
+        /// </returns>
+        Object Narrow(Type type);            
+        
+        /// <value>
+        /// The time that the Transport waits before considering a request to have
+        /// failed and throwing an exception.
+        /// </value>
+        TimeSpan RequestTimeout
+        {
+            get;
+            set;
+        }
+
+		CommandHandler Command
+		{
+			get;
+			set;
+		}
+
+		ExceptionHandler Exception
+		{
+			get;
+			set;
+		}
+
+		InterruptedHandler Interrupted
+		{
+			get;
+			set;
+		}		
+
+		ResumedHandler Resumed
+		{
+			get;
+			set;
+		}		
+
+        /// <value>
+        /// Indicates if this Transport has already been disposed and can no longer
+        /// be used.
+        /// </value>
+		bool IsDisposed
+		{
+			get;
+		}
+
+        /// <value>
+        /// Indicates if this Transport is Fault Tolerant or not.  A fault Tolerant
+        /// Transport handles low level connection errors internally allowing a client
+        /// to remain unaware of wire level disconnection and reconnection details.
+        /// </value>
+        bool IsFaultTolerant
+        {
+            get;
+        }
+
+        /// <value>
+        /// Indiciates if the Transport is current Connected to is assigned URI.
+        /// </value>
+        bool IsConnected
+        {
+            get;
+        }
+
+        /// <value>
+        /// The Remote Address that this transport is currently connected to.
+        /// </value>
+        Uri RemoteAddress
+        {
+            get;
+        }
+        
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransportFactory.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransportFactory.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Stomp.Transport
+{
+	public delegate void SetTransport(ITransport transport, Uri uri);
+
+	public interface ITransportFactory
+	{
+		ITransport CreateTransport(Uri location);
+		ITransport CompositeConnect(Uri location);
+		ITransport CompositeConnect(Uri location, SetTransport setTransport);
+	}
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransportFactory.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/IWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/IWireFormat.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/IWireFormat.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/IWireFormat.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.IO;
+
+namespace Apache.NMS.Stomp.Transport
+{
+    /// <summary>
+    /// Represents the marshalling of commands to and from an IO stream
+    /// </summary>
+    public interface IWireFormat
+    {
+        /// <summary>
+        /// Marshalls the given command object onto the stream
+        /// </summary>
+        void Marshal(Object o, BinaryWriter ds);
+
+        /// <summary>
+        /// Unmarshalls the next command object from the stream
+        /// </summary>
+        Object Unmarshal(BinaryReader dis);
+
+        ITransport Transport {
+            get; set;
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/IWireFormat.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/LoggingTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/LoggingTransport.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/LoggingTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/LoggingTransport.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Apache.NMS;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Transport;
+using System;
+
+namespace Apache.NMS.Stomp.Transport
+{
+	
+	/// <summary>
+	/// A Transport filter that is used to log the commands sent and received.
+	/// </summary>
+	public class LoggingTransport : TransportFilter
+    {
+		public LoggingTransport(ITransport next) : base(next) {
+		}
+		
+		protected override void OnCommand(ITransport sender, Command command) {
+			Tracer.Info("RECEIVED: " + command);
+			this.commandHandler(sender, command);
+		}
+		
+		protected override void OnException(ITransport sender, Exception error) {
+			Tracer.Error("RECEIVED Exception: " + error);
+			this.exceptionHandler(sender, error);
+		}
+		
+		public override void Oneway(Command command)
+		{
+			Tracer.Info("SENDING: " + command);
+			this.next.Oneway(command);
+		}
+				
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/LoggingTransport.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/MutexTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/MutexTransport.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/MutexTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/MutexTransport.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Transport;
+using System;
+
+namespace Apache.NMS.Stomp.Transport
+{
+	/// <summary>
+	/// A Transport which guards access to the next transport using a mutex.
+	/// </summary>
+	public class MutexTransport : TransportFilter
+	{
+		private readonly object transmissionLock = new object();
+
+		public MutexTransport(ITransport next) : base(next)
+		{
+		}
+
+		public override void Oneway(Command command)
+		{
+			lock(transmissionLock)
+			{
+				this.next.Oneway(command);
+			}
+		}
+
+		public override FutureResponse AsyncRequest(Command command)
+		{
+			lock(transmissionLock)
+			{
+				return base.AsyncRequest(command);
+			}
+		}
+
+		public override Response Request(Command command, TimeSpan timeout)
+		{
+			lock(transmissionLock)
+			{
+				return base.Request(command, timeout);
+			}
+		}
+	}
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/MutexTransport.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections;
+
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Transport;
+using Apache.NMS;
+
+namespace Apache.NMS.Stomp.Transport
+{
+	/// <summary>
+	/// A Transport that correlates asynchronous send/receive messages into single request/response.
+	/// </summary>
+	public class ResponseCorrelator : TransportFilter
+	{
+		private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
+		private int nextCommandId;
+
+		public ResponseCorrelator(ITransport next) : base(next)
+		{
+		}
+
+		protected override void OnException(ITransport sender, Exception command)
+		{
+			base.OnException(sender, command);
+
+			foreach(DictionaryEntry entry in requestMap)
+			{
+				FutureResponse value = (FutureResponse) entry.Value;
+				ExceptionResponse response = new ExceptionResponse();
+				BrokerError error = new BrokerError();
+
+				error.Message = command.Message;
+				response.Exception = error;
+				value.Response = response;
+			}
+
+			requestMap.Clear();
+		}
+
+		internal int GetNextCommandId()
+		{
+            return Interlocked.Increment(ref nextCommandId);
+		}
+
+		public override void Oneway(Command command)
+		{
+			if(0 == command.CommandId)
+			{
+				command.CommandId = GetNextCommandId();
+			}
+
+			next.Oneway(command);
+		}
+
+		public override FutureResponse AsyncRequest(Command command)
+		{
+			int commandId = GetNextCommandId();
+
+			command.CommandId = commandId;
+			command.ResponseRequired = true;
+			FutureResponse future = new FutureResponse();
+			requestMap[commandId] = future;
+			next.Oneway(command);
+			return future;
+		}
+
+		public override Response Request(Command command, TimeSpan timeout)
+		{
+			FutureResponse future = AsyncRequest(command);
+			future.ResponseTimeout = timeout;
+			Response response = future.Response;
+
+			if(response != null && response is ExceptionResponse)
+			{
+				ExceptionResponse er = (ExceptionResponse) response;
+				BrokerError brokerError = er.Exception;
+
+				if (brokerError == null)
+				{
+					throw new BrokerException();
+				}
+				else
+				{
+					throw new BrokerException(brokerError);
+				}
+			}
+
+			return response;
+		}
+
+		protected override void OnCommand(ITransport sender, Command command)
+		{
+			if(command is Response)
+			{
+				Response response = (Response) command;
+				int correlationId = response.CorrelationId;
+				FutureResponse future = (FutureResponse) requestMap[correlationId];
+				
+				if(future != null)
+				{
+					requestMap.Remove(correlationId);
+					future.Response = response;
+
+					if(response is ExceptionResponse)
+					{
+						ExceptionResponse er = (ExceptionResponse) response;
+						BrokerError brokerError = er.Exception;
+						BrokerException exception = new BrokerException(brokerError);
+						this.exceptionHandler(this, exception);
+					}
+				}
+				else
+				{
+					Tracer.Error("Unknown response ID: " + response.CommandId + " for response: " + response);
+				}
+			}
+			else if(command is ShutdownInfo)
+			{
+				// lets shutdown
+				this.commandHandler(sender, command);
+			}
+			else
+			{
+				this.commandHandler(sender, command);
+			}
+		}
+	}
+}
+
+

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
------------------------------------------------------------------------------
    svn:eol-style = native