You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/12/04 00:11:06 UTC
svn commit: r886978 [1/2] - in
/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp: ./
Commands/ Protocol/ Transport/ Transport/Tcp/ Util/
Author: tabish
Date: Thu Dec 3 23:11:03 2009
New Revision: 886978
URL: http://svn.apache.org/viewvc?rev=886978&view=rev
Log:
Commit a bunch of initial work on porting the Stomp bits plus other useful classes over from NMS.ActiveMQ. None of this compiles yet.
Added:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseDataStructure.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructure.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Response.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrameStream.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransportFactory.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/IWireFormat.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/LoggingTransport.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/MutexTransport.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Util/MessageDispatchChannel.cs (with props)
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//
+// Marshalling code for Open Wire Format for BaseCommand
+//
+//
+// NOTE!: This file is autogenerated - do not modify!
+// if you need to make a change, please see the Groovy scripts in the
+// activemq-openwire module
+//
+
+using System;
+
+namespace Apache.NMS.Stomp.Commands
+{
+ public abstract class BaseCommand : BaseDataStructure, Command, ICloneable
+ {
+ private int commandId;
+ private bool responseRequired = false;
+
+ public int CommandId
+ {
+ get { return commandId; }
+ set { this.commandId = value; }
+ }
+
+ public override int GetHashCode()
+ {
+ return (CommandId * 37) + GetDataStructureType();
+ }
+
+ public override bool Equals(Object that)
+ {
+ if(that is BaseCommand)
+ {
+ BaseCommand thatCommand = (BaseCommand) that;
+ return this.GetDataStructureType() == thatCommand.GetDataStructureType()
+ && this.CommandId == thatCommand.CommandId;
+ }
+ return false;
+ }
+
+ public override String ToString()
+ {
+ string answer = GetDataStructureTypeAsString(GetDataStructureType());
+ if(answer.Length == 0)
+ {
+ answer = base.ToString();
+ }
+ return answer + ": id = " + CommandId;
+ }
+
+ public static String GetDataStructureTypeAsString(int type)
+ {
+ String packetTypeStr = "";
+ switch(type)
+ {
+ case ActiveMQMessage.ID_ACTIVEMQMESSAGE:
+ packetTypeStr = "ACTIVEMQ_MESSAGE";
+ break;
+ case ActiveMQTextMessage.ID_ACTIVEMQTEXTMESSAGE:
+ packetTypeStr = "ACTIVEMQ_TEXT_MESSAGE";
+ break;
+ case ActiveMQObjectMessage.ID_ACTIVEMQOBJECTMESSAGE:
+ packetTypeStr = "ACTIVEMQ_OBJECT_MESSAGE";
+ break;
+ case ActiveMQBytesMessage.ID_ACTIVEMQBYTESMESSAGE:
+ packetTypeStr = "ACTIVEMQ_BYTES_MESSAGE";
+ break;
+ case ActiveMQStreamMessage.ID_ACTIVEMQSTREAMMESSAGE:
+ packetTypeStr = "ACTIVEMQ_STREAM_MESSAGE";
+ break;
+ case ActiveMQMapMessage.ID_ACTIVEMQMAPMESSAGE:
+ packetTypeStr = "ACTIVEMQ_MAP_MESSAGE";
+ break;
+ case MessageAck.ID_MESSAGEACK:
+ packetTypeStr = "ACTIVEMQ_MSG_ACK";
+ break;
+ case Response.ID_RESPONSE:
+ packetTypeStr = "RESPONSE";
+ break;
+ case ConsumerInfo.ID_CONSUMERINFO:
+ packetTypeStr = "CONSUMER_INFO";
+ break;
+ case ProducerInfo.ID_PRODUCERINFO:
+ packetTypeStr = "PRODUCER_INFO";
+ break;
+ case TransactionInfo.ID_TRANSACTIONINFO:
+ packetTypeStr = "TRANSACTION_INFO";
+ break;
+ case BrokerInfo.ID_BROKERINFO:
+ packetTypeStr = "BROKER_INFO";
+ break;
+ case ConnectionInfo.ID_CONNECTIONINFO:
+ packetTypeStr = "CONNECTION_INFO";
+ break;
+ case SessionInfo.ID_SESSIONINFO:
+ packetTypeStr = "SESSION_INFO";
+ break;
+ case RemoveSubscriptionInfo.ID_REMOVESUBSCRIPTIONINFO:
+ packetTypeStr = "DURABLE_UNSUBSCRIBE";
+ break;
+ case IntegerResponse.ID_INTEGERRESPONSE:
+ packetTypeStr = "INT_RESPONSE_RECEIPT_INFO";
+ break;
+ case WireFormatInfo.ID_WIREFORMATINFO:
+ packetTypeStr = "WIRE_FORMAT_INFO";
+ break;
+ case RemoveInfo.ID_REMOVEINFO:
+ packetTypeStr = "REMOVE_INFO";
+ break;
+ case KeepAliveInfo.ID_KEEPALIVEINFO:
+ packetTypeStr = "KEEP_ALIVE";
+ break;
+ }
+ return packetTypeStr;
+ }
+
+ public virtual Response visit(ICommandVisitor visitor)
+ {
+ throw new ApplicationException("BaseCommand.Visit() not implemented");
+ }
+
+ public virtual bool IsBrokerInfo
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool IsConnectionInfo
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool IsConnectionError
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool IsConsumerInfo
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool IsControlCommand
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool IsDestinationInfo
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool IsMessage
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool IsMessageAck
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool IsMessageDispatch
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool IsProducerAck
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool IsProducerInfo
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool IsRemoveInfo
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool IsRemoveSubscriptionInfo
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool IsResponse
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool IsSessionInfo
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool IsShutdownInfo
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool IsTransactionInfo
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool IsWireFormatInfo
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public virtual bool ResponseRequired
+ {
+ get
+ {
+ return responseRequired;
+ }
+ set
+ {
+ responseRequired = value;
+ }
+ }
+
+ public override Object Clone()
+ {
+ // Since we are a derived class use the base's Clone()
+ // to perform the shallow copy. Since it is shallow it
+ // will include our derived class. Since we are derived,
+ // this method is an override.
+ BaseCommand o = (BaseCommand) base.Clone();
+
+ return o;
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseDataStructure.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseDataStructure.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseDataStructure.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseDataStructure.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Stomp.Commands
+{
+
+ /// <summary>
+ /// Base class for all DataStructure implementations
+ /// </summary>
+ public abstract class BaseDataStructure : DataStructure, ICloneable
+ {
+ public virtual byte GetDataStructureType()
+ {
+ return 0;
+ }
+
+ public virtual bool IsMarshallAware()
+ {
+ return false;
+ }
+
+ public virtual void BeforeMarshall(OpenWireFormat wireFormat)
+ {
+ }
+
+ public virtual void AfterMarshall(OpenWireFormat wireFormat)
+ {
+ }
+
+ public virtual void BeforeUnmarshall(OpenWireFormat wireFormat)
+ {
+ }
+
+ public virtual void AfterUnmarshall(OpenWireFormat wireFormat)
+ {
+ }
+
+ public virtual void SetMarshalledForm(OpenWireFormat wireFormat, byte[] data)
+ {
+ }
+
+ public virtual byte[] GetMarshalledForm(OpenWireFormat wireFormat)
+ {
+ return null;
+ }
+
+ // Helper methods
+ public int HashCode(object value)
+ {
+ if(value != null)
+ {
+ return value.GetHashCode();
+ }
+ else
+ {
+ return -1;
+ }
+ }
+
+ public virtual Object Clone()
+ {
+ // Since we are the lowest level base class, do a
+ // shallow copy which will include the derived classes.
+ // From here we would do deep cloning of other objects
+ // if we had any.
+ return this.MemberwiseClone();
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseDataStructure.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Stomp.Commands
+{
+ /// <summary>
+ /// An Stomp command
+ /// </summary>
+ public interface Command : ICloneable
+ {
+ int CommandId
+ {
+ get;
+ set;
+ }
+
+ bool ResponseRequired
+ {
+ get;
+ set;
+ }
+
+ bool IsConnectionInfo
+ {
+ get;
+ }
+
+ bool IsConnectedCommand
+ {
+ get;
+ }
+
+ bool IsErrorCommand
+ {
+ get;
+ }
+
+ bool IsDestinationInfo
+ {
+ get;
+ }
+
+ bool IsMessage
+ {
+ get;
+ }
+
+ bool IsMessageAck
+ {
+ get;
+ }
+
+ bool IsMessageDispatch
+ {
+ get;
+ }
+
+ bool IsRemoveInfo
+ {
+ get;
+ }
+
+ bool IsRemoveSubscriptionInfo
+ {
+ get;
+ }
+
+ bool IsResponse
+ {
+ get;
+ }
+
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructure.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructure.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructure.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructure.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Stomp.Commands
+{
+
+ /// <summary>
+ /// An OpenWire command
+ /// </summary>
+ public interface DataStructure : ICloneable
+ {
+ byte GetDataStructureType();
+ bool IsMarshallAware();
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructure.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructure.cs
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Response.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Response.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Response.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Response.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+
+namespace Apache.NMS.Stomp.Commands
+{
+
+ public class Response : BaseCommand
+ {
+ int correlationId;
+
+ ///
+ /// <summery>
+ /// Returns a string containing the information for this DataStructure
+ /// such as its type and value of its elements.
+ /// </summery>
+ ///
+ public override string ToString()
+ {
+ return GetType().Name + "[" +
+ "CorrelationId=" + CorrelationId +
+ "]";
+ }
+
+ public int CorrelationId
+ {
+ get { return correlationId; }
+ set { this.correlationId = value; }
+ }
+
+ ///
+ /// <summery>
+ /// Return an answer of true to the isResponse() query.
+ /// </summery>
+ ///
+ public override bool IsResponse
+ {
+ get
+ {
+ return true;
+ }
+ }
+
+ };
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Response.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Transport;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.Policies;
+
+namespace Apache.NMS.Stomp
+{
+ /// <summary>
+ /// Represents a connection with a message broker
+ /// </summary>
+ public class ConnectionFactory : IConnectionFactory
+ {
+ public const string DEFAULT_BROKER_URL = "tcp://localhost:61613";
+ public const string ENV_BROKER_URL = "STOMP_BROKER_URL";
+
+ private static event ExceptionListener onException;
+ private Uri brokerUri;
+ private string connectionUserName;
+ private string connectionPassword;
+ private string clientId;
+
+ private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+ private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
+
+ static ConnectionFactory()
+ {
+ TransportFactory.OnException += ConnectionFactory.ExceptionHandler;
+ }
+
+ public static string GetDefaultBrokerUrl()
+ {
+#if (PocketPC||NETCF||NETCF_2_0)
+ return DEFAULT_BROKER_URL;
+#else
+ return Environment.GetEnvironmentVariable(ENV_BROKER_URL) ?? DEFAULT_BROKER_URL;
+#endif
+ }
+
+ public ConnectionFactory()
+ : this(GetDefaultBrokerUrl())
+ {
+ }
+
+ public ConnectionFactory(string brokerUri)
+ : this(brokerUri, null)
+ {
+ }
+
+ public ConnectionFactory(string brokerUri, string clientID)
+ : this(new Uri(brokerUri), clientID)
+ {
+ }
+
+ public ConnectionFactory(Uri brokerUri)
+ : this(brokerUri, null)
+ {
+ }
+
+ public ConnectionFactory(Uri brokerUri, string clientID)
+ {
+ this.brokerUri = brokerUri;
+ this.clientId = clientID;
+ }
+
+ public IConnection CreateConnection()
+ {
+ return CreateConnection(connectionUserName, connectionPassword);
+ }
+
+ public IConnection CreateConnection(string userName, string password)
+ {
+ // Strip off the activemq prefix, if it exists.
+ Uri uri = new Uri(URISupport.stripPrefix(brokerUri.OriginalString, "activemq:"));
+
+ Tracer.InfoFormat("Connecting to: {0}", uri.ToString());
+
+ ConnectionInfo info = CreateConnectionInfo(userName, password);
+ ITransport transport = TransportFactory.CreateTransport(uri);
+ Connection connection = new Connection(uri, transport, info);
+
+ // Set the Factory level configuration to the Connection, this can be overriden by
+ // the params on the Connection URI so we do this before applying the params.
+ connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
+ connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
+
+ // Set properties on connection using parameters prefixed with "connection."
+ // Since this could be a composite Uri, assume the connection-specific parameters
+ // are associated with the outer-most specification of the composite Uri. What's nice
+ // is that this works with simple Uri as well.
+ URISupport.CompositeData c = URISupport.parseComposite(uri);
+ URISupport.SetProperties(connection, c.Parameters, "connection.");
+
+ connection.ITransport.Start();
+ return connection;
+ }
+
+ // Properties
+
+ /// <summary>
+ /// Get/or set the broker Uri.
+ /// </summary>
+ public Uri BrokerUri
+ {
+ get { return brokerUri; }
+ set { brokerUri = value; }
+ }
+
+ public string UserName
+ {
+ get { return connectionUserName; }
+ set { connectionUserName = value; }
+ }
+
+ public string Password
+ {
+ get { return connectionPassword; }
+ set { connectionPassword = value; }
+ }
+
+ public string ClientId
+ {
+ get { return clientId; }
+ set { clientId = value; }
+ }
+
+ public IRedeliveryPolicy RedeliveryPolicy
+ {
+ get { return this.redeliveryPolicy; }
+ set
+ {
+ if(value != null)
+ {
+ this.redeliveryPolicy = value;
+ }
+ }
+ }
+
+ public event ExceptionListener OnException
+ {
+ add { onException += value; }
+ remove
+ {
+ if(onException != null)
+ {
+ onException -= value;
+ }
+ }
+ }
+
+ protected virtual ConnectionInfo CreateConnectionInfo(string userName, string password)
+ {
+ ConnectionInfo answer = new ConnectionInfo();
+ ConnectionId connectionId = new ConnectionId();
+ connectionId.Value = CreateNewGuid();
+
+ answer.ConnectionId = connectionId;
+ answer.UserName = userName;
+ answer.Password = password;
+ answer.ClientId = clientId ?? CreateNewGuid();
+
+ return answer;
+ }
+
+ protected static string CreateNewGuid()
+ {
+ return Guid.NewGuid().ToString();
+ }
+
+ protected static void ExceptionHandler(Exception ex)
+ {
+ if(ConnectionFactory.onException != null)
+ {
+ ConnectionFactory.onException(ex);
+ }
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrameStream.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrameStream.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrameStream.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrameStream.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.Reflection;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Transport;
+using Apache.NMS;
+using System;
+using System.Collections;
+using System.IO;
+using System.Text;
+
+namespace Apache.NMS.Stomp.Protocol
+{
+ /// <summary>
+ /// A Stream for writing a <a href="http://stomp.codehaus.org/">STOMP</a> Frame
+ /// </summary>
+ public class StompFrameStream
+ {
+ /// Used to terminate a header line or end of a headers section of the Frame.
+ public const String NEWLINE = "\n";
+ /// Used to seperate the Key / Value pairing in Frame Headers
+ public const String SEPARATOR = ":";
+ /// Used to mark the End of the Frame.
+ public const byte FRAME_TERMINUS = (byte) 0;
+
+ private StringBuilder builder = new StringBuilder();
+ private BinaryWriter ds;
+ private byte[] content;
+ private int contentLength = -1;
+ private Encoding encoding;
+
+ public StompFrameStream(BinaryWriter ds, Encoding encoding)
+ {
+ this.ds = ds;
+ this.encoding = encoding;
+ }
+
+ public byte[] Content
+ {
+ get { return content; }
+ set { content = value; }
+ }
+
+ public int ContentLength
+ {
+ get { return contentLength; }
+ set
+ {
+ contentLength = value;
+ WriteHeader("content-length", contentLength);
+ }
+ }
+
+ public void WriteCommand(Command command, String name)
+ {
+ WriteCommand(command, name, false);
+ }
+
+ public void WriteCommand(Command command, String name, bool ignoreErrors)
+ {
+ builder.Append(name);
+ builder.Append(NEWLINE);
+ if(command.ResponseRequired)
+ {
+ if(ignoreErrors)
+ {
+ WriteHeader("receipt", "ignore:" + command.CommandId);
+ }
+ else
+ {
+ WriteHeader("receipt", command.CommandId);
+ }
+ }
+ }
+
+ public void WriteHeader(String name, Object value)
+ {
+ if (value != null)
+ {
+ builder.Append(name);
+ builder.Append(SEPARATOR);
+ builder.Append(value);
+ builder.Append(NEWLINE);
+ }
+ }
+
+ public void WriteHeader(String name, bool value)
+ {
+ if (value)
+ {
+ builder.Append(name);
+ builder.Append(SEPARATOR);
+ builder.Append("true");
+ builder.Append(NEWLINE);
+ }
+ }
+
+ public void Flush()
+ {
+ builder.Append(NEWLINE);
+ ds.Write(encoding.GetBytes(builder.ToString()));
+
+ if (content != null)
+ {
+ ds.Write(content);
+ }
+
+ // Always write a terminating NULL byte to end the content frame.
+ ds.Write(FRAME_TERMINUS);
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrameStream.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Text;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS;
+
+namespace Apache.NMS.Stomp.Protocol
+{
+ /// <summary>
+ /// Some <a href="http://stomp.codehaus.org/">STOMP</a> protocol conversion helper methods.
+ /// </summary>
+ public class StompHelper
+ {
+ private static int ParseInt(string text)
+ {
+ StringBuilder sbtext = new StringBuilder();
+
+ for(int idx = 0; idx < text.Length; idx++)
+ {
+ if(char.IsNumber(text, idx))
+ {
+ sbtext.Append(text[idx]);
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ if(sbtext.Length > 0)
+ {
+ return Int32.Parse(sbtext.ToString());
+ }
+
+ return 0;
+ }
+
+ public static ActiveMQDestination ToDestination(string text)
+ {
+ if(text == null)
+ {
+ return null;
+ }
+
+ int type = ActiveMQDestination.ACTIVEMQ_QUEUE;
+ string lowertext = text.ToLower();
+ if(lowertext.StartsWith("/queue/"))
+ {
+ text = text.Substring("/queue/".Length);
+ }
+ else if(lowertext.StartsWith("/topic/"))
+ {
+ text = text.Substring("/topic/".Length);
+ type = ActiveMQDestination.ACTIVEMQ_TOPIC;
+ }
+ else if(lowertext.StartsWith("/temp-topic/"))
+ {
+ text = text.Substring("/temp-topic/".Length);
+ type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC;
+ }
+ else if(lowertext.StartsWith("/temp-queue/"))
+ {
+ text = text.Substring("/temp-queue/".Length);
+ type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE;
+ }
+ else if(lowertext.StartsWith("/remote-temp-topic/"))
+ {
+ type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC;
+ }
+ else if(lowertext.StartsWith("/remote-temp-queue/"))
+ {
+ type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE;
+ }
+
+ return ActiveMQDestination.CreateDestination(type, text);
+ }
+
+ public static string ToStomp(ActiveMQDestination destination)
+ {
+ if(destination == null)
+ {
+ return null;
+ }
+
+ switch (destination.DestinationType)
+ {
+ case DestinationType.Topic:
+ return "/topic/" + destination.PhysicalName;
+
+ case DestinationType.TemporaryTopic:
+ if (destination.PhysicalName.ToLower().StartsWith("/remote-temp-topic/"))
+ {
+ return destination.PhysicalName;
+ }
+
+ return "/temp-topic/" + destination.PhysicalName;
+
+ case DestinationType.TemporaryQueue:
+ if (destination.PhysicalName.ToLower().StartsWith("/remote-temp-queue/"))
+ {
+ return destination.PhysicalName;
+ }
+
+ return "/temp-queue/" + destination.PhysicalName;
+
+ default:
+ return "/queue/" + destination.PhysicalName;
+ }
+ }
+
+ public static string ToStomp(ConsumerId id)
+ {
+ return id.ConnectionId + ":" + id.SessionId + ":" + id.Value;
+ }
+
+ public static ConsumerId ToConsumerId(string text)
+ {
+ if(text == null)
+ {
+ return null;
+ }
+
+ ConsumerId answer = new ConsumerId();
+ int idx = text.LastIndexOf(':');
+ if (idx >= 0)
+ {
+ try
+ {
+ answer.Value = ParseInt(text.Substring(idx + 1));
+ text = text.Substring(0, idx);
+ idx = text.LastIndexOf(':');
+ if (idx >= 0)
+ {
+ try
+ {
+ answer.SessionId = ParseInt(text.Substring(idx + 1));
+ text = text.Substring(0, idx);
+ }
+ catch(Exception ex)
+ {
+ Tracer.Debug(ex.Message);
+ }
+ }
+ }
+ catch(Exception ex)
+ {
+ Tracer.Debug(ex.Message);
+ }
+ }
+ answer.ConnectionId = text;
+ return answer;
+ }
+
+ public static string ToStomp(ProducerId id)
+ {
+ StringBuilder producerBuilder = new StringBuilder();
+
+ producerBuilder.Append(id.ConnectionId);
+ producerBuilder.Append(":");
+ producerBuilder.Append(id.SessionId);
+ producerBuilder.Append(":");
+ producerBuilder.Append(id.Value);
+
+ return producerBuilder.ToString();
+ }
+
+ public static ProducerId ToProducerId(string text)
+ {
+ if(text == null)
+ {
+ return null;
+ }
+
+ ProducerId answer = new ProducerId();
+ int idx = text.LastIndexOf(':');
+ if(idx >= 0)
+ {
+ try
+ {
+ answer.Value = ParseInt(text.Substring(idx + 1));
+ text = text.Substring(0, idx);
+ idx = text.LastIndexOf(':');
+ if (idx >= 0)
+ {
+ answer.SessionId = ParseInt(text.Substring(idx + 1));
+ text = text.Substring(0, idx);
+ }
+ }
+ catch(Exception ex)
+ {
+ Tracer.Debug(ex.Message);
+ }
+ }
+ answer.ConnectionId = text;
+ return answer;
+ }
+
+ public static string ToStomp(MessageId id)
+ {
+ StringBuilder messageBuilder = new StringBuilder();
+
+ messageBuilder.Append(ToStomp(id.ProducerId));
+ messageBuilder.Append(":");
+ messageBuilder.Append(id.ProducerSequenceId);
+
+ return messageBuilder.ToString();
+ }
+
+ public static MessageId ToMessageId(string text)
+ {
+ if(text == null)
+ {
+ return null;
+ }
+
+ MessageId answer = new MessageId();
+ int idx = text.LastIndexOf(':');
+ if (idx >= 0)
+ {
+ try
+ {
+ answer.ProducerSequenceId = ParseInt(text.Substring(idx + 1));
+ text = text.Substring(0, idx);
+ }
+ catch(Exception ex)
+ {
+ Tracer.Debug(ex.Message);
+ }
+ }
+ answer.ProducerId = ToProducerId(text);
+ return answer;
+ }
+
+ public static string ToStomp(TransactionId id)
+ {
+ if(id is LocalTransactionId)
+ {
+ return ToStomp(id as LocalTransactionId);
+ }
+
+ return id.ToString();
+ }
+
+ public static string ToStomp(LocalTransactionId transactionId)
+ {
+ return transactionId.ConnectionId.Value + ":" + transactionId.Value;
+ }
+
+ public static bool ToBool(string text, bool defaultValue)
+ {
+ if(text == null)
+ {
+ return defaultValue;
+ }
+
+ return (0 == string.Compare("true", text, true));
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Transport;
+using Apache.NMS;
+using System;
+using System.Collections;
+using System.IO;
+using System.Text;
+
+namespace Apache.NMS.Stomp.Protocol
+{
+ /// <summary>
+ /// Implements the <a href="http://stomp.codehaus.org/">STOMP</a> protocol.
+ /// </summary>
+ public class StompWireFormat : IWireFormat
+ {
+ private Encoding encoding = new UTF8Encoding();
+ private ITransport transport;
+ private IDictionary consumers = Hashtable.Synchronized(new Hashtable());
+
+ public StompWireFormat()
+ {
+ }
+
+ public ITransport Transport {
+ get { return transport; }
+ set { transport = value; }
+ }
+
+ public int Version {
+ get { return 1; }
+ }
+
+ public void Marshal(Object o, BinaryWriter binaryWriter)
+ {
+ Tracer.Debug(">>>> " + o);
+ StompFrameStream ds = new StompFrameStream(binaryWriter, encoding);
+
+ if (o is ConnectionInfo)
+ {
+ WriteConnectionInfo((ConnectionInfo) o, ds);
+ }
+ else if (o is ActiveMQMessage)
+ {
+ WriteMessage((ActiveMQMessage) o, ds);
+ }
+ else if (o is ConsumerInfo)
+ {
+ WriteConsumerInfo((ConsumerInfo) o, ds);
+ }
+ else if (o is MessageAck)
+ {
+ WriteMessageAck((MessageAck) o, ds);
+ }
+ else if (o is TransactionInfo)
+ {
+ WriteTransactionInfo((TransactionInfo) o, ds);
+ }
+ else if (o is ShutdownInfo)
+ {
+ WriteShutdownInfo((ShutdownInfo) o, ds);
+ }
+ else if (o is RemoveInfo)
+ {
+ WriteRemoveInfo((RemoveInfo) o, ds);
+ }
+ else if (o is Command)
+ {
+ Command command = o as Command;
+ if (command.ResponseRequired)
+ {
+ Response response = new Response();
+ response.CorrelationId = command.CommandId;
+ SendCommand(response);
+ Tracer.Debug("#### Autorespond to command: " + o.GetType());
+ }
+ }
+ else
+ {
+ Tracer.Debug("#### Ignored command: " + o.GetType());
+ }
+ }
+
+
+ internal String ReadLine(BinaryReader dis)
+ {
+ MemoryStream ms = new MemoryStream();
+ while (true)
+ {
+ int nextChar = dis.Read();
+ if (nextChar < 0)
+ {
+ throw new IOException("Peer closed the stream.");
+ }
+ if( nextChar == 10 )
+ {
+ break;
+ }
+ ms.WriteByte((byte)nextChar);
+ }
+ byte[] data = ms.ToArray();
+ return encoding.GetString(data, 0, data.Length);
+ }
+
+ public Object Unmarshal(BinaryReader dis)
+ {
+ string command;
+ do {
+ command = ReadLine(dis);
+ }
+ while (command == "");
+
+ Tracer.Debug("<<<< command: " + command);
+
+ IDictionary headers = new Hashtable();
+ string line;
+ while ((line = ReadLine(dis)) != "")
+ {
+ int idx = line.IndexOf(':');
+ if (idx > 0)
+ {
+ string key = line.Substring(0, idx);
+ string value = line.Substring(idx + 1);
+ headers[key] = value;
+
+ Tracer.Debug("<<<< header: " + key + " = " + value);
+ }
+ else
+ {
+ // lets ignore this bad header!
+ }
+ }
+ byte[] content = null;
+ string length = ToString(headers["content-length"]);
+ if (length != null)
+ {
+ int size = Int32.Parse(length);
+ content = dis.ReadBytes(size);
+ // Read the terminating NULL byte for this frame.
+ int nullByte = dis.Read();
+ if(nullByte != 0)
+ {
+ Tracer.Debug("<<<< error reading frame null byte.");
+ }
+ }
+ else
+ {
+ MemoryStream ms = new MemoryStream();
+ int nextChar;
+ while((nextChar = dis.Read()) != 0)
+ {
+ if( nextChar < 0 )
+ {
+ // EOF ??
+ break;
+ }
+ ms.WriteByte((byte)nextChar);
+ }
+ content = ms.ToArray();
+ }
+ Object answer = CreateCommand(command, headers, content);
+ Tracer.Debug("<<<< received: " + answer);
+ return answer;
+ }
+
+ protected virtual Object CreateCommand(string command, IDictionary headers, byte[] content)
+ {
+ if(command == "RECEIPT" || command == "CONNECTED")
+ {
+ string text = RemoveHeader(headers, "receipt-id");
+ if(text != null)
+ {
+ Response answer = new Response();
+ if(text.StartsWith("ignore:"))
+ {
+ text = text.Substring("ignore:".Length);
+ }
+
+ answer.CorrelationId = Int32.Parse(text);
+ return answer;
+ }
+ else if(command == "CONNECTED")
+ {
+ text = RemoveHeader(headers, "response-id");
+ if (text != null)
+ {
+ Response answer = new Response();
+ answer.CorrelationId = Int32.Parse(text);
+ return answer;
+ }
+ }
+ }
+ else if(command == "ERROR")
+ {
+ string text = RemoveHeader(headers, "receipt-id");
+
+ if(text != null && text.StartsWith("ignore:"))
+ {
+ Response answer = new Response();
+ answer.CorrelationId = Int32.Parse(text.Substring("ignore:".Length));
+ return answer;
+ }
+ else
+ {
+ ExceptionResponse answer = new ExceptionResponse();
+ if(text != null)
+ {
+ answer.CorrelationId = Int32.Parse(text);
+ }
+
+ BrokerError error = new BrokerError();
+ error.Message = RemoveHeader(headers, "message");
+ error.ExceptionClass = RemoveHeader(headers, "exceptionClass");
+ // TODO is this the right header?
+ answer.Exception = error;
+ return answer;
+ }
+ }
+ else if (command == "MESSAGE")
+ {
+ return ReadMessage(command, headers, content);
+ }
+ Tracer.Error("Unknown command: " + command + " headers: " + headers);
+ return null;
+ }
+
+ protected virtual Command ReadMessage(string command, IDictionary headers, byte[] content)
+ {
+ ActiveMQMessage message = null;
+ if (headers.Contains("content-length"))
+ {
+ message = new ActiveMQBytesMessage();
+ message.Content = content;
+ }
+ else
+ {
+ message = new ActiveMQTextMessage(encoding.GetString(content, 0, content.Length));
+ }
+
+ // TODO now lets set the various headers
+
+ message.Type = RemoveHeader(headers, "type");
+ message.Destination = StompHelper.ToDestination(RemoveHeader(headers, "destination"));
+ message.ReplyTo = StompHelper.ToDestination(RemoveHeader(headers, "reply-to"));
+ message.TargetConsumerId = StompHelper.ToConsumerId(RemoveHeader(headers, "subscription"));
+ message.CorrelationId = RemoveHeader(headers, "correlation-id");
+ message.MessageId = StompHelper.ToMessageId(RemoveHeader(headers, "message-id"));
+ message.Persistent = StompHelper.ToBool(RemoveHeader(headers, "persistent"), true);
+
+ string header = RemoveHeader(headers, "priority");
+ if (header != null) message.Priority = Byte.Parse(header);
+
+ header = RemoveHeader(headers, "timestamp");
+ if (header != null) message.Timestamp = Int64.Parse(header);
+
+ header = RemoveHeader(headers, "expires");
+ if (header != null) message.Expiration = Int64.Parse(header);
+
+ // now lets add the generic headers
+ foreach (string key in headers.Keys)
+ {
+ Object value = headers[key];
+ if (value != null)
+ {
+ // lets coerce some standard header extensions
+ if (key == "NMSXGroupSeq")
+ {
+ value = Int32.Parse(value.ToString());
+ }
+ }
+ message.Properties[key] = value;
+ }
+ MessageDispatch dispatch = new MessageDispatch();
+ dispatch.Message = message;
+ dispatch.ConsumerId = message.TargetConsumerId;
+ dispatch.Destination = message.Destination;
+ return dispatch;
+ }
+
+ protected virtual void WriteConnectionInfo(ConnectionInfo command, StompFrameStream ss)
+ {
+ // lets force a receipt
+ command.ResponseRequired = true;
+
+ ss.WriteCommand(command, "CONNECT");
+ ss.WriteHeader("client-id", command.ClientId);
+ ss.WriteHeader("login", command.UserName);
+ ss.WriteHeader("passcode", command.Password);
+
+ if (command.ResponseRequired)
+ {
+ ss.WriteHeader("request-id", command.CommandId);
+ }
+
+ ss.Flush();
+ }
+
+ protected virtual void WriteShutdownInfo(ShutdownInfo command, StompFrameStream ss)
+ {
+ ss.WriteCommand(command, "DISCONNECT");
+ System.Diagnostics.Debug.Assert(!command.ResponseRequired);
+ ss.Flush();
+ }
+
+ protected virtual void WriteConsumerInfo(ConsumerInfo command, StompFrameStream ss)
+ {
+ ss.WriteCommand(command, "SUBSCRIBE");
+ ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
+ ss.WriteHeader("id", StompHelper.ToStomp(command.ConsumerId));
+ ss.WriteHeader("durable-subscriber-name", command.SubscriptionName);
+ ss.WriteHeader("selector", command.Selector);
+ if ( command.NoLocal )
+ ss.WriteHeader("no-local", command.NoLocal);
+ ss.WriteHeader("ack", "client");
+
+ // ActiveMQ extensions to STOMP
+ ss.WriteHeader("activemq.dispatchAsync", command.DispatchAsync);
+ if ( command.Exclusive )
+ ss.WriteHeader("activemq.exclusive", command.Exclusive);
+
+ if( command.SubscriptionName != null )
+ {
+ ss.WriteHeader("activemq.subscriptionName", command.SubscriptionName);
+ // For an older 4.0 broker we need to set this header so they get the
+ // subscription as wel..
+ ss.WriteHeader("activemq.subcriptionName", command.SubscriptionName);
+ }
+
+ ss.WriteHeader("activemq.maximumPendingMessageLimit", command.MaximumPendingMessageLimit);
+ ss.WriteHeader("activemq.prefetchSize", command.PrefetchSize);
+ ss.WriteHeader("activemq.priority", command.Priority);
+ if ( command.Retroactive )
+ ss.WriteHeader("activemq.retroactive", command.Retroactive);
+
+ consumers[command.ConsumerId] = command.ConsumerId;
+ ss.Flush();
+ }
+
+ protected virtual void WriteRemoveInfo(RemoveInfo command, StompFrameStream ss)
+ {
+ object id = command.ObjectId;
+
+ if (id is ConsumerId)
+ {
+ ConsumerId consumerId = id as ConsumerId;
+ ss.WriteCommand(command, "UNSUBSCRIBE");
+ ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
+ ss.Flush();
+ consumers.Remove(consumerId);
+ }
+ else if (id is SessionId)
+ {
+ // When a session is removed, it needs to remove it's consumers too.
+ // Find all the consumer that were part of the session.
+ SessionId sessionId = (SessionId) id;
+ ArrayList matches = new ArrayList();
+ foreach (DictionaryEntry entry in consumers)
+ {
+ ConsumerId t = (ConsumerId) entry.Key;
+ if( sessionId.ConnectionId==t.ConnectionId && sessionId.Value==t.SessionId )
+ {
+ matches.Add(t);
+ }
+ }
+
+ bool unsubscribedConsumer = false;
+
+ // Un-subscribe them.
+ foreach (ConsumerId consumerId in matches)
+ {
+ ss.WriteCommand(command, "UNSUBSCRIBE");
+ ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
+ ss.Flush();
+ consumers.Remove(consumerId);
+ unsubscribedConsumer = true;
+ }
+
+ if(!unsubscribedConsumer && command.ResponseRequired)
+ {
+ ss.WriteCommand(command, "UNSUBSCRIBE", true);
+ ss.WriteHeader("id", sessionId);
+ ss.Flush();
+ }
+ }
+ else if(id is ProducerId)
+ {
+ if(command.ResponseRequired)
+ {
+ ss.WriteCommand(command, "UNSUBSCRIBE", true);
+ ss.WriteHeader("id", id);
+ ss.Flush();
+ }
+ }
+ else if(id is ConnectionId)
+ {
+ if(command.ResponseRequired)
+ {
+ ss.WriteCommand(command, "UNSUBSCRIBE", true);
+ ss.WriteHeader("id", id);
+ ss.Flush();
+ }
+ }
+ }
+
+
+ protected virtual void WriteTransactionInfo(TransactionInfo command, StompFrameStream ss)
+ {
+ TransactionId id = command.TransactionId;
+ if (id is LocalTransactionId)
+ {
+ string type = "BEGIN";
+ TransactionType transactionType = (TransactionType) command.Type;
+ switch (transactionType)
+ {
+ case TransactionType.CommitOnePhase:
+ command.ResponseRequired = true;
+ type = "COMMIT";
+ break;
+ case TransactionType.Rollback:
+ command.ResponseRequired = true;
+ type = "ABORT";
+ break;
+ }
+
+ Tracer.Debug(">>> For transaction type: " + transactionType + " we are using command type: " + type);
+ ss.WriteCommand(command, type);
+ ss.WriteHeader("transaction", StompHelper.ToStomp(id));
+ ss.Flush();
+ }
+ }
+
+ protected virtual void WriteMessage(ActiveMQMessage command, StompFrameStream ss)
+ {
+ ss.WriteCommand(command, "SEND");
+ ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
+ if (command.ReplyTo != null)
+ ss.WriteHeader("reply-to", StompHelper.ToStomp(command.ReplyTo));
+ if (command.CorrelationId != null )
+ ss.WriteHeader("correlation-id", command.CorrelationId);
+ if (command.Expiration != 0)
+ ss.WriteHeader("expires", command.Expiration);
+ if (command.Priority != 4)
+ ss.WriteHeader("priority", command.Priority);
+ if (command.Type != null)
+ ss.WriteHeader("type", command.Type);
+ if (command.TransactionId!=null)
+ ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
+
+ ss.WriteHeader("persistent", command.Persistent);
+
+ // lets force the content to be marshalled
+
+ command.BeforeMarshall(null);
+ if (command is ActiveMQTextMessage)
+ {
+ ActiveMQTextMessage textMessage = command as ActiveMQTextMessage;
+ ss.Content = encoding.GetBytes(textMessage.Text);
+ }
+ else
+ {
+ ss.Content = command.Content;
+ if(null != command.Content)
+ {
+ ss.ContentLength = command.Content.Length;
+ }
+ else
+ {
+ ss.ContentLength = 0;
+ }
+ }
+
+ IPrimitiveMap map = command.Properties;
+ foreach (string key in map.Keys)
+ {
+ ss.WriteHeader(key, map[key]);
+ }
+ ss.Flush();
+ }
+
+ protected virtual void WriteMessageAck(MessageAck command, StompFrameStream ss)
+ {
+ ss.WriteCommand(command, "ACK", true);
+
+ // TODO handle bulk ACKs?
+ ss.WriteHeader("message-id", StompHelper.ToStomp(command.LastMessageId));
+ if(command.TransactionId != null)
+ {
+ ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
+ }
+
+ ss.Flush();
+ }
+
+ protected virtual void SendCommand(Command command)
+ {
+ if (transport == null)
+ {
+ Tracer.Fatal("No transport configured so cannot return command: " + command);
+ }
+ else
+ {
+ transport.Command(transport, command);
+ }
+ }
+
+ protected virtual string RemoveHeader(IDictionary headers, string name)
+ {
+ object value = headers[name];
+ if (value == null)
+ {
+ return null;
+ }
+ else
+ {
+ headers.Remove(name);
+ return value.ToString();
+ }
+ }
+
+
+ protected virtual string ToString(object value)
+ {
+ if (value != null)
+ {
+ return value.ToString();
+ }
+ else
+ {
+ return null;
+ }
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS.Stomp.Commands;
+using System;
+using System.Threading;
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp.Transport
+{
+ /// <summary>
+ /// Handles asynchronous responses
+ /// </summary>
+ public class FutureResponse
+ {
+ private static TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
+ public TimeSpan ResponseTimeout
+ {
+ get { return maxWait; }
+ set { maxWait = value; }
+ }
+
+ private readonly CountDownLatch latch = new CountDownLatch(1);
+ private Response response;
+
+ public WaitHandle AsyncWaitHandle
+ {
+ get { return latch.AsyncWaitHandle; }
+ }
+
+ public Response Response
+ {
+ // Blocks the caller until a value has been set
+ get
+ {
+ bool waitForResponse = false;
+
+ lock(latch)
+ {
+ if(null == response)
+ {
+ waitForResponse = true;
+ }
+ }
+
+ if(waitForResponse)
+ {
+ try
+ {
+ if(!latch.await(maxWait))
+ {
+ // TODO: Throw timeout exception?
+ }
+ }
+ catch (Exception e)
+ {
+ Tracer.Error("Caught while waiting on monitor: " + e);
+ }
+ }
+
+ lock(latch)
+ {
+ return response;
+ }
+ }
+
+ set
+ {
+ lock(latch)
+ {
+ response = value;
+ }
+
+ latch.countDown();
+ }
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/FutureResponse.cs
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.Stomp.Commands;
+
+namespace Apache.NMS.Stomp.Transport
+{
+ public delegate void CommandHandler(ITransport sender, Command command);
+ public delegate void ExceptionHandler(ITransport sender, Exception command);
+ public delegate void InterruptedHandler(ITransport sender);
+ public delegate void ResumedHandler(ITransport sender);
+
+ /// <summary>
+ /// Represents the logical networking transport layer. Transports implment the low
+ /// level protocol specific portion of the Communication between the Client and a Broker
+ /// such as TCP, UDP, etc. Transports make use of WireFormat objects to handle translateing
+ /// the cononical OpenWire Commands used in this client into binary wire level packets that
+ /// can be sent to the Broker or Service that the Transport connects to.
+ /// </summary>
+ public interface ITransport : IStartable, IDisposable, IStoppable
+ {
+ /// <summary>
+ /// Sends a Command object on the Wire but does not wait for any response from the
+ /// receiver before returning.
+ /// </summary>
+ /// <param name="command">
+ /// A <see cref="Command"/>
+ /// </param>
+ void Oneway(Command command);
+
+ /// <summary>
+ /// Sends a Command object which requires a response from the Broker but does not
+ /// wait for the response, instead a FutureResponse object is returned that the
+ /// caller can use to wait on the Broker's response.
+ /// </summary>
+ /// <param name="command">
+ /// A <see cref="Command"/>
+ /// </param>
+ /// <returns>
+ /// A <see cref="FutureResponse"/>
+ /// </returns>
+ FutureResponse AsyncRequest(Command command);
+
+ /// <summary>
+ /// Sends a Command to the Broker and waits for a Response to that Command before
+ /// returning, this version waits indefinitely for a response.
+ /// </summary>
+ /// <param name="command">
+ /// A <see cref="Command"/>
+ /// </param>
+ /// <returns>
+ /// A <see cref="Response"/>
+ /// </returns>
+ Response Request(Command command);
+
+ /// <summary>
+ /// Sends a Command to the Broker and waits for the given TimeSpan to expire for a
+ /// response before returning.
+ /// </summary>
+ /// <param name="command">
+ /// A <see cref="Command"/>
+ /// </param>
+ /// <param name="timeout">
+ /// A <see cref="TimeSpan"/>
+ /// </param>
+ /// <returns>
+ /// A <see cref="Response"/>
+ /// </returns>
+ Response Request(Command command, TimeSpan timeout);
+
+ /// <summary>
+ /// Allows a caller to find a specific type of Transport in the Chain of
+ /// Transports that is created. This allows a caller to find a specific
+ /// object in the Transport chain and set or get properties on that specific
+ /// instance. If the requested type isn't in the chain than Null is returned.
+ /// </summary>
+ /// <param name="type">
+ /// A <see cref="Type"/>
+ /// </param>
+ /// <returns>
+ /// A <see cref="System.Object"/>
+ /// </returns>
+ Object Narrow(Type type);
+
+ /// <value>
+ /// The time that the Transport waits before considering a request to have
+ /// failed and throwing an exception.
+ /// </value>
+ TimeSpan RequestTimeout
+ {
+ get;
+ set;
+ }
+
+ CommandHandler Command
+ {
+ get;
+ set;
+ }
+
+ ExceptionHandler Exception
+ {
+ get;
+ set;
+ }
+
+ InterruptedHandler Interrupted
+ {
+ get;
+ set;
+ }
+
+ ResumedHandler Resumed
+ {
+ get;
+ set;
+ }
+
+ /// <value>
+ /// Indicates if this Transport has already been disposed and can no longer
+ /// be used.
+ /// </value>
+ bool IsDisposed
+ {
+ get;
+ }
+
+ /// <value>
+ /// Indicates if this Transport is Fault Tolerant or not. A fault Tolerant
+ /// Transport handles low level connection errors internally allowing a client
+ /// to remain unaware of wire level disconnection and reconnection details.
+ /// </value>
+ bool IsFaultTolerant
+ {
+ get;
+ }
+
+ /// <value>
+ /// Indiciates if the Transport is current Connected to is assigned URI.
+ /// </value>
+ bool IsConnected
+ {
+ get;
+ }
+
+ /// <value>
+ /// The Remote Address that this transport is currently connected to.
+ /// </value>
+ Uri RemoteAddress
+ {
+ get;
+ }
+
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransportFactory.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransportFactory.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Stomp.Transport
+{
+ public delegate void SetTransport(ITransport transport, Uri uri);
+
+ public interface ITransportFactory
+ {
+ ITransport CreateTransport(Uri location);
+ ITransport CompositeConnect(Uri location);
+ ITransport CompositeConnect(Uri location, SetTransport setTransport);
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransportFactory.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/IWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/IWireFormat.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/IWireFormat.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/IWireFormat.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.IO;
+
+namespace Apache.NMS.Stomp.Transport
+{
+ /// <summary>
+ /// Represents the marshalling of commands to and from an IO stream
+ /// </summary>
+ public interface IWireFormat
+ {
+ /// <summary>
+ /// Marshalls the given command object onto the stream
+ /// </summary>
+ void Marshal(Object o, BinaryWriter ds);
+
+ /// <summary>
+ /// Unmarshalls the next command object from the stream
+ /// </summary>
+ Object Unmarshal(BinaryReader dis);
+
+ ITransport Transport {
+ get; set;
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/IWireFormat.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/LoggingTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/LoggingTransport.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/LoggingTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/LoggingTransport.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Apache.NMS;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Transport;
+using System;
+
+namespace Apache.NMS.Stomp.Transport
+{
+
+ /// <summary>
+ /// A Transport filter that is used to log the commands sent and received.
+ /// </summary>
+ public class LoggingTransport : TransportFilter
+ {
+ public LoggingTransport(ITransport next) : base(next) {
+ }
+
+ protected override void OnCommand(ITransport sender, Command command) {
+ Tracer.Info("RECEIVED: " + command);
+ this.commandHandler(sender, command);
+ }
+
+ protected override void OnException(ITransport sender, Exception error) {
+ Tracer.Error("RECEIVED Exception: " + error);
+ this.exceptionHandler(sender, error);
+ }
+
+ public override void Oneway(Command command)
+ {
+ Tracer.Info("SENDING: " + command);
+ this.next.Oneway(command);
+ }
+
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/LoggingTransport.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/MutexTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/MutexTransport.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/MutexTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/MutexTransport.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Transport;
+using System;
+
+namespace Apache.NMS.Stomp.Transport
+{
+ /// <summary>
+ /// A Transport which guards access to the next transport using a mutex.
+ /// </summary>
+ public class MutexTransport : TransportFilter
+ {
+ private readonly object transmissionLock = new object();
+
+ public MutexTransport(ITransport next) : base(next)
+ {
+ }
+
+ public override void Oneway(Command command)
+ {
+ lock(transmissionLock)
+ {
+ this.next.Oneway(command);
+ }
+ }
+
+ public override FutureResponse AsyncRequest(Command command)
+ {
+ lock(transmissionLock)
+ {
+ return base.AsyncRequest(command);
+ }
+ }
+
+ public override Response Request(Command command, TimeSpan timeout)
+ {
+ lock(transmissionLock)
+ {
+ return base.Request(command, timeout);
+ }
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/MutexTransport.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections;
+
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Transport;
+using Apache.NMS;
+
+namespace Apache.NMS.Stomp.Transport
+{
+ /// <summary>
+ /// A Transport that correlates asynchronous send/receive messages into single request/response.
+ /// </summary>
+ public class ResponseCorrelator : TransportFilter
+ {
+ private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
+ private int nextCommandId;
+
+ public ResponseCorrelator(ITransport next) : base(next)
+ {
+ }
+
+ protected override void OnException(ITransport sender, Exception command)
+ {
+ base.OnException(sender, command);
+
+ foreach(DictionaryEntry entry in requestMap)
+ {
+ FutureResponse value = (FutureResponse) entry.Value;
+ ExceptionResponse response = new ExceptionResponse();
+ BrokerError error = new BrokerError();
+
+ error.Message = command.Message;
+ response.Exception = error;
+ value.Response = response;
+ }
+
+ requestMap.Clear();
+ }
+
+ internal int GetNextCommandId()
+ {
+ return Interlocked.Increment(ref nextCommandId);
+ }
+
+ public override void Oneway(Command command)
+ {
+ if(0 == command.CommandId)
+ {
+ command.CommandId = GetNextCommandId();
+ }
+
+ next.Oneway(command);
+ }
+
+ public override FutureResponse AsyncRequest(Command command)
+ {
+ int commandId = GetNextCommandId();
+
+ command.CommandId = commandId;
+ command.ResponseRequired = true;
+ FutureResponse future = new FutureResponse();
+ requestMap[commandId] = future;
+ next.Oneway(command);
+ return future;
+ }
+
+ public override Response Request(Command command, TimeSpan timeout)
+ {
+ FutureResponse future = AsyncRequest(command);
+ future.ResponseTimeout = timeout;
+ Response response = future.Response;
+
+ if(response != null && response is ExceptionResponse)
+ {
+ ExceptionResponse er = (ExceptionResponse) response;
+ BrokerError brokerError = er.Exception;
+
+ if (brokerError == null)
+ {
+ throw new BrokerException();
+ }
+ else
+ {
+ throw new BrokerException(brokerError);
+ }
+ }
+
+ return response;
+ }
+
+ protected override void OnCommand(ITransport sender, Command command)
+ {
+ if(command is Response)
+ {
+ Response response = (Response) command;
+ int correlationId = response.CorrelationId;
+ FutureResponse future = (FutureResponse) requestMap[correlationId];
+
+ if(future != null)
+ {
+ requestMap.Remove(correlationId);
+ future.Response = response;
+
+ if(response is ExceptionResponse)
+ {
+ ExceptionResponse er = (ExceptionResponse) response;
+ BrokerError brokerError = er.Exception;
+ BrokerException exception = new BrokerException(brokerError);
+ this.exceptionHandler(this, exception);
+ }
+ }
+ else
+ {
+ Tracer.Error("Unknown response ID: " + response.CommandId + " for response: " + response);
+ }
+ }
+ else if(command is ShutdownInfo)
+ {
+ // lets shutdown
+ this.commandHandler(sender, command);
+ }
+ else
+ {
+ this.commandHandler(sender, command);
+ }
+ }
+ }
+}
+
+
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
------------------------------------------------------------------------------
svn:eol-style = native