You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/11/25 23:19:12 UTC
svn commit: r1545423 - in /activemq/activemq-dotnet/Apache.NMS.MQTT/trunk:
./ src/main/csharp/ src/main/csharp/Protocol/ src/main/csharp/Transport/
src/main/csharp/Transport/Tcp/ src/test/csharp/
Author: tabish
Date: Mon Nov 25 22:19:12 2013
New Revision: 1545423
URL: http://svn.apache.org/r1545423
Log:
https://issues.apache.org/jira/browse/AMQNET-458
Implementation
Added:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/IWireFormat.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/LoggingTransport.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/MutexTransport.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransport.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransportFactory.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/TransportFilter.cs (with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs?rev=1545423&r1=1545422&r2=1545423&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs Mon Nov 25 22:19:12 2013
@@ -22,6 +22,6 @@ using System.Runtime.InteropServices;
[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2013 Apache Software Foundation")]
[assembly: AssemblyTrademarkAttribute("")]
[assembly: AssemblyCultureAttribute("")]
-[assembly: AssemblyVersionAttribute("1.7.0.3244")]
+[assembly: AssemblyVersionAttribute("1.7.0.3250")]
[assembly: AssemblyInformationalVersionAttribute("1.7.0")]
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs?rev=1545423&r1=1545422&r2=1545423&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs Mon Nov 25 22:19:12 2013
@@ -107,7 +107,7 @@ namespace Apache.NMS.MQTT
}
listener += value;
- //this.session.Redispatch(this.unconsumedMessages);
+ this.session.Redispatch(this.unconsumedMessages);
if(wasStarted)
{
@@ -219,7 +219,7 @@ namespace Apache.NMS.MQTT
MessageDispatch dispatch = this.unconsumedMessages.DequeueNoWait();
if(dispatch != null)
{
- //this.Dispatch(dispatch);
+ this.Dispatch(dispatch);
return true;
}
}
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,47 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using System.IO;
+using Apache.NMS.MQTT.Transport;
+
+namespace Apache.NMS.MQTT.Protocol
+{
+ public class MQTTWireFormat : IWireFormat
+ {
+ private ITransport transport;
+
+ public MQTTWireFormat()
+ {
+ }
+
+ public void Marshal(Object o, BinaryWriter ds)
+ {
+ }
+
+ public Object Unmarshal(BinaryReader dis)
+ {
+ return null;
+ }
+
+ public ITransport Transport
+ {
+ get { return this.transport; }
+ set { this.transport = value; }
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs?rev=1545423&r1=1545422&r2=1545423&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs Mon Nov 25 22:19:12 2013
@@ -18,6 +18,7 @@ using System;
using System.Collections;
using System.Threading;
using Apache.NMS.MQTT.Messages;
+using Apache.NMS.MQTT.Util;
namespace Apache.NMS.MQTT
{
@@ -512,6 +513,17 @@ namespace Apache.NMS.MQTT
}
}
+ internal void Redispatch(MessageDispatchChannel channel)
+ {
+ MessageDispatch[] messages = channel.RemoveAll();
+ System.Array.Reverse(messages);
+
+ foreach(MessageDispatch message in messages)
+ {
+ this.executor.ExecuteFirst(message);
+ }
+ }
+
public void Dispatch(MessageDispatch dispatch)
{
if(this.executor != null)
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs?rev=1545423&r1=1545422&r2=1545423&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs Mon Nov 25 22:19:12 2013
@@ -1,20 +1,19 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
using System;
namespace Apache.NMS.MQTT.Transport
@@ -178,6 +177,14 @@ namespace Apache.NMS.MQTT.Transport
/// </param>
void UpdateURIs(bool rebalance, Uri[] updatedURIs);
+ /// <summary>
+ /// Returns the IWireFormat object that this transport uses to marshal and
+ /// unmarshal Command objects.
+ /// </summary>
+ IWireFormat WireFormat
+ {
+ get;
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs?rev=1545423&r1=1545422&r2=1545423&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs Mon Nov 25 22:19:12 2013
@@ -1,30 +1,26 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
using System;
namespace Apache.NMS.MQTT.Transport
{
- public delegate void SetTransport(ITransport transport, Uri uri);
-
public interface ITransportFactory
{
ITransport CreateTransport(Uri location);
ITransport CompositeConnect(Uri location);
- ITransport CompositeConnect(Uri location, SetTransport setTransport);
}
}
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/IWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/IWireFormat.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/IWireFormat.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/IWireFormat.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,42 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using System.IO;
+
+namespace Apache.NMS.MQTT.Transport
+{
+ /// <summary>
+ /// Represents the marshalling of commands to and from an IO stream
+ /// </summary>
+ public interface IWireFormat
+ {
+ /// <summary>
+ /// Marshalls the given command object onto the stream
+ /// </summary>
+ void Marshal(Object o, BinaryWriter ds);
+
+ /// <summary>
+ /// Unmarshalls the next command object from the stream
+ /// </summary>
+ Object Unmarshal(BinaryReader dis);
+
+ ITransport Transport {
+ get; set;
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/IWireFormat.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/IWireFormat.cs
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/LoggingTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/LoggingTransport.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/LoggingTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/LoggingTransport.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,50 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT.Transport
+{
+ /// <summary>
+ /// A Transport filter that is used to log the commands sent and received.
+ /// </summary>
+ public class LoggingTransport : TransportFilter
+ {
+ public LoggingTransport(ITransport next) : base(next)
+ {
+ }
+
+ protected override void OnCommand(ITransport sender, Command command)
+ {
+ Tracer.Info("RECEIVED: " + command);
+ this.commandHandler(sender, command);
+ }
+
+ protected override void OnException(ITransport sender, Exception error)
+ {
+ Tracer.Error("RECEIVED Exception: " + error);
+ this.exceptionHandler(sender, error);
+ }
+
+ public override void Oneway(Command command)
+ {
+ Tracer.Info("SENDING: " + command);
+ this.next.Oneway(command);
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/LoggingTransport.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/MutexTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/MutexTransport.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/MutexTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/MutexTransport.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,102 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using System.Threading;
+
+namespace Apache.NMS.MQTT.Transport
+{
+ /// <summary>
+ /// A Transport which guards access to the next transport using a mutex.
+ /// </summary>
+ public class MutexTransport : TransportFilter
+ {
+ private readonly object transmissionLock = new object();
+
+ private void GetTransmissionLock(int timeout)
+ {
+ if(timeout > 0)
+ {
+ DateTime timeoutTime = DateTime.Now + TimeSpan.FromMilliseconds(timeout);
+ int waitCount = 1;
+
+ while(true)
+ {
+ if(Monitor.TryEnter(transmissionLock))
+ {
+ break;
+ }
+
+ if(DateTime.Now > timeoutTime)
+ {
+ throw new IOException(string.Format("Oneway timed out after {0} milliseconds.", timeout));
+ }
+
+ // Back off from being overly aggressive. Having too many threads
+ // aggressively trying to get the lock pegs the CPU.
+ Thread.Sleep(3 * (waitCount++));
+ }
+ }
+ else
+ {
+ Monitor.Enter(transmissionLock);
+ }
+ }
+
+ public MutexTransport(ITransport next) : base(next)
+ {
+ }
+
+ public override void Oneway(Command command)
+ {
+ GetTransmissionLock(this.next.Timeout);
+ try
+ {
+ base.Oneway(command);
+ }
+ finally
+ {
+ Monitor.Exit(transmissionLock);
+ }
+ }
+
+ public override FutureResponse AsyncRequest(Command command)
+ {
+ GetTransmissionLock(this.next.AsyncTimeout);
+ try
+ {
+ return base.AsyncRequest(command);
+ }
+ finally
+ {
+ Monitor.Exit(transmissionLock);
+ }
+ }
+
+ public override Response Request(Command command, TimeSpan timeout)
+ {
+ GetTransmissionLock((int) timeout.TotalMilliseconds);
+ try
+ {
+ return base.Request(command, timeout);
+ }
+ finally
+ {
+ Monitor.Exit(transmissionLock);
+ }
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/MutexTransport.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,158 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using System.Collections;
+using System.Threading;
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT.Transport
+{
+ /// <summary>
+ /// A Transport that correlates asynchronous send/receive messages into single request/response.
+ /// </summary>
+ public class ResponseCorrelator : TransportFilter
+ {
+ private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
+ private int nextCommandId;
+ private Exception error;
+
+ public ResponseCorrelator(ITransport next) : base(next)
+ {
+ }
+
+ protected override void OnException(ITransport sender, Exception command)
+ {
+ Dispose(command);
+ base.OnException(sender, command);
+ }
+
+ internal int GetNextCommandId()
+ {
+ return Interlocked.Increment(ref nextCommandId);
+ }
+
+ public override void Oneway(Command command)
+ {
+// command.CommandId = GetNextCommandId();
+// command.ResponseRequired = false;
+ next.Oneway(command);
+ }
+
+ public override FutureResponse AsyncRequest(Command command)
+ {
+ int commandId = GetNextCommandId();
+
+// command.CommandId = commandId;
+// command.ResponseRequired = true;
+ FutureResponse future = new FutureResponse();
+ Exception priorError = null;
+ lock(requestMap.SyncRoot)
+ {
+ priorError = this.error;
+ if(priorError == null)
+ {
+ requestMap[commandId] = future;
+ }
+ }
+
+ if(priorError != null)
+ {
+// BrokerError brError = new BrokerError();
+// brError.Message = priorError.Message;
+// ExceptionResponse response = new ExceptionResponse();
+// response.Exception = brError;
+// future.Response = response;
+ return future;
+ }
+
+ next.Oneway(command);
+
+ return future;
+ }
+
+ public override Response Request(Command command, TimeSpan timeout)
+ {
+ FutureResponse future = AsyncRequest(command);
+ future.ResponseTimeout = timeout;
+ Response response = future.Response;
+ return response;
+ }
+
+ protected override void OnCommand(ITransport sender, Command command)
+ {
+ if(command.IsResponse)
+ {
+ Response response = (Response) command;
+ int correlationId = response.CorrelationId;
+ FutureResponse future = (FutureResponse) requestMap[correlationId];
+
+ if(future != null)
+ {
+ requestMap.Remove(correlationId);
+ future.Response = response;
+ }
+ else
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Unknown response ID: " + response.CorrelationId + " for response: " + response);
+ }
+ }
+ }
+ else
+ {
+ this.commandHandler(sender, command);
+ }
+ }
+
+ public override void Stop()
+ {
+ this.Dispose(new IOException("Stopped"));
+ base.Stop();
+ }
+
+ private void Dispose(Exception error)
+ {
+ ArrayList requests = null;
+
+ lock(requestMap.SyncRoot)
+ {
+ if(this.error == null)
+ {
+ this.error = error;
+ requests = new ArrayList(requestMap.Values);
+ requestMap.Clear();
+ }
+ }
+
+ if(requests != null)
+ {
+ foreach(FutureResponse future in requests)
+ {
+// BrokerError brError = new BrokerError();
+// brError.Message = error.Message;
+// ExceptionResponse response = new ExceptionResponse();
+// response.Exception = brError;
+// future.Response = response;
+ }
+ }
+ }
+
+ }
+}
+
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransport.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransport.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using System.Net.Sockets;
+using System.Net.Security;
+using System.Security.Authentication;
+using System.Security.Cryptography.X509Certificates;
+
+namespace Apache.NMS.MQTT.Transport.Tcp
+{
+ public class SslTransport : TcpTransport
+ {
+ private string serverName;
+ private string clientCertSubject;
+ private string clientCertFilename;
+ private string clientCertPassword;
+ private string keyStoreName;
+ private string keyStoreLocation;
+ private bool acceptInvalidBrokerCert = false;
+
+ private SslStream sslStream;
+
+ public SslTransport(Uri location, Socket socket, IWireFormat wireFormat) :
+ base(location, socket, wireFormat)
+ {
+ }
+
+ ~SslTransport()
+ {
+ Dispose(false);
+ }
+
+ /// <summary>
+ /// Indicates the name of the Server's Certificate. By default the Host name
+ /// of the remote server is used, however if this doesn't match the name of the
+ /// Server's certificate then this option can be set to override the default.
+ /// </summary>
+ public string ServerName
+ {
+ get { return this.serverName; }
+ set { this.serverName = value; }
+ }
+
+ public string ClientCertSubject
+ {
+ get { return this.clientCertSubject; }
+ set { this.clientCertSubject = value; }
+ }
+
+ /// <summary>
+ /// Indicates the location of the Client Certificate to use when the Broker
+ /// is configured for Client Auth (not common). The SslTransport will supply
+ /// this certificate to the SslStream via the SelectLocalCertificate method.
+ /// </summary>
+ public string ClientCertFilename
+ {
+ get { return this.clientCertFilename; }
+ set { this.clientCertFilename = value; }
+ }
+
+ /// <summary>
+ /// Password for the Client Certificate specified via configuration.
+ /// </summary>
+ public string ClientCertPassword
+ {
+ get { return this.clientCertPassword; }
+ set { this.clientCertPassword = value; }
+ }
+
+ /// <summary>
+ /// Indicates if the SslTransport should ignore any errors in the supplied Broker
+ /// certificate and connect anyway, this is useful in testing with a default AMQ
+ /// broker certificate that is self signed.
+ /// </summary>
+ public bool AcceptInvalidBrokerCert
+ {
+ get { return this.acceptInvalidBrokerCert; }
+ set { this.acceptInvalidBrokerCert = value; }
+ }
+
+ public string KeyStoreName
+ {
+ get { return this.keyStoreName; }
+ set { this.keyStoreName = value; }
+ }
+
+ public string KeyStoreLocation
+ {
+ get { return this.keyStoreLocation; }
+ set { this.keyStoreLocation = value; }
+ }
+
+ protected override Stream CreateSocketStream()
+ {
+ if(this.sslStream != null)
+ {
+ return this.sslStream;
+ }
+
+ this.sslStream = new SslStream(
+ new NetworkStream(this.socket),
+ false,
+ new RemoteCertificateValidationCallback(ValidateServerCertificate),
+ new LocalCertificateSelectionCallback(SelectLocalCertificate) );
+
+ try
+ {
+
+ string remoteCertName = this.serverName ?? this.RemoteAddress.Host;
+ Tracer.Debug("Authorizing as Client for Server: " + remoteCertName);
+ sslStream.AuthenticateAsClient(remoteCertName, LoadCertificates(), SslProtocols.Default, false);
+ Tracer.Debug("Server is Authenticated = " + sslStream.IsAuthenticated);
+ Tracer.Debug("Server is Encrypted = " + sslStream.IsEncrypted);
+ }
+ catch(Exception e)
+ {
+ Tracer.ErrorFormat("Exception: {0}", e.Message);
+ if(e.InnerException != null)
+ {
+ Tracer.ErrorFormat("Inner exception: {0}", e.InnerException.Message);
+ }
+ Tracer.Error("Authentication failed - closing the connection.");
+
+ throw;
+ }
+
+ return sslStream;
+ }
+
+ private bool ValidateServerCertificate(object sender,
+ X509Certificate certificate,
+ X509Chain chain,
+ SslPolicyErrors sslPolicyErrors)
+ {
+ Tracer.DebugFormat("ValidateServerCertificate: Issued By {0}", certificate.Issuer);
+ if(sslPolicyErrors == SslPolicyErrors.None)
+ {
+ return true;
+ }
+
+ Tracer.WarnFormat("Certificate error: {0}", sslPolicyErrors.ToString());
+ if(sslPolicyErrors == SslPolicyErrors.RemoteCertificateChainErrors)
+ {
+ Tracer.Error("Chain Status errors: ");
+ foreach( X509ChainStatus status in chain.ChainStatus )
+ {
+ Tracer.Error("*** Chain Status error: " + status.Status);
+ Tracer.Error("*** Chain Status information: " + status.StatusInformation);
+ }
+ }
+ else if(sslPolicyErrors == SslPolicyErrors.RemoteCertificateNameMismatch)
+ {
+ Tracer.Error("Mismatch between Remote Cert Name.");
+ }
+ else if(sslPolicyErrors == SslPolicyErrors.RemoteCertificateNotAvailable)
+ {
+ Tracer.Error("The Remote Certificate was not Available.");
+ }
+
+ // Configuration may or may not allow us to connect with an invliad broker cert.
+ return AcceptInvalidBrokerCert;
+ }
+
+ private X509Certificate SelectLocalCertificate(object sender,
+ string targetHost,
+ X509CertificateCollection localCertificates,
+ X509Certificate remoteCertificate,
+ string[] acceptableIssuers)
+ {
+ Tracer.DebugFormat("Client is selecting a local certificate from {0} possibilities.", localCertificates.Count);
+
+ if(localCertificates.Count == 1)
+ {
+ Tracer.Debug("Client has selected certificate with Subject = " + localCertificates[0].Subject);
+ return localCertificates[0];
+ }
+ else if(localCertificates.Count > 1 && this.clientCertSubject != null)
+ {
+ foreach(X509Certificate2 certificate in localCertificates)
+ {
+ Tracer.Debug("Checking Client Certificate := " + certificate.ToString());
+ if(String.Compare(certificate.Subject, this.clientCertSubject, true) == 0)
+ {
+ Tracer.Debug("Client has selected certificate with Subject = " + certificate.Subject);
+ return certificate;
+ }
+ }
+ }
+
+ Tracer.Debug("Client did not select a Certificate, returning null.");
+ return null;
+ }
+
+ private X509Certificate2Collection LoadCertificates()
+ {
+ X509Certificate2Collection collection = new X509Certificate2Collection();
+
+ if(!String.IsNullOrEmpty(this.clientCertFilename))
+ {
+ Tracer.Debug("Attempting to load Client Certificate from file := " + this.clientCertFilename);
+ X509Certificate2 certificate = new X509Certificate2(this.clientCertFilename, this.clientCertPassword);
+ Tracer.Debug("Loaded Client Certificate := " + certificate.ToString());
+
+ collection.Add(certificate);
+ }
+ else
+ {
+ string name = String.IsNullOrEmpty(this.keyStoreName) ? StoreName.My.ToString() : this.keyStoreName;
+
+ StoreLocation location = StoreLocation.CurrentUser;
+
+ if(!String.IsNullOrEmpty(this.keyStoreLocation))
+ {
+ if(String.Compare(this.keyStoreLocation, "CurrentUser", true) == 0)
+ {
+ location = StoreLocation.CurrentUser;
+ }
+ else if(String.Compare(this.keyStoreLocation, "LocalMachine", true) == 0)
+ {
+ location = StoreLocation.LocalMachine;
+ }
+ else
+ {
+ throw new NMSException("Invlalid StoreLocation given on URI");
+ }
+ }
+
+ X509Store store = new X509Store(name, location);
+ store.Open(OpenFlags.ReadOnly);
+ collection = store.Certificates;
+ store.Close();
+ }
+
+ return collection;
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransport.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransportFactory.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransportFactory.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,96 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using System.Web;
+using System.Net.Sockets;
+
+namespace Apache.NMS.MQTT.Transport.Tcp
+{
+ [MQTTTransportFactory("ssl")]
+ public class SslTransportFactory : TcpTransportFactory
+ {
+ private string serverName;
+ private string clientCertSubject;
+ private string clientCertFilename;
+ private string clientCertPassword;
+ private string keyStoreName;
+ private string keyStoreLocation;
+ private bool acceptInvalidBrokerCert = false;
+
+ public SslTransportFactory() : base()
+ {
+ }
+
+ public string ServerName
+ {
+ get { return this.serverName; }
+ set { this.serverName = value; }
+ }
+
+ public string ClientCertSubject
+ {
+ get { return this.clientCertSubject; }
+ set { this.clientCertSubject = value; }
+ }
+
+ public string ClientCertFilename
+ {
+ get { return this.clientCertFilename; }
+ set { this.clientCertFilename = value; }
+ }
+
+ public string ClientCertPassword
+ {
+ get { return this.clientCertPassword; }
+ set { this.clientCertPassword = value; }
+ }
+
+ public bool AcceptInvalidBrokerCert
+ {
+ get { return this.acceptInvalidBrokerCert; }
+ set { this.acceptInvalidBrokerCert = value; }
+ }
+
+ public string KeyStoreName
+ {
+ get { return this.keyStoreName; }
+ set { this.keyStoreName = value; }
+ }
+
+ public string KeyStoreLocation
+ {
+ get { return this.keyStoreLocation; }
+ set { this.keyStoreLocation = value; }
+ }
+
+ protected override ITransport DoCreateTransport(Uri location, Socket socket, IWireFormat wireFormat )
+ {
+ Tracer.Debug("Creating new instance of the SSL Transport.");
+ SslTransport transport = new SslTransport(location, socket, wireFormat);
+
+ transport.ClientCertSubject = HttpUtility.UrlDecode(this.clientCertSubject);
+ transport.ClientCertFilename = this.clientCertFilename;
+ transport.ClientCertPassword = this.clientCertPassword;
+ transport.ServerName = this.serverName;
+ transport.KeyStoreLocation = this.keyStoreLocation;
+ transport.KeyStoreName = this.keyStoreName;
+ transport.AcceptInvalidBrokerCert = this.acceptInvalidBrokerCert;
+
+ return transport;
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransportFactory.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using System.Net.Sockets;
+using System.Threading;
+using Apache.NMS.MQTT.Commands;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.MQTT.Transport.Tcp
+{
+ /// <summary>
+ /// An implementation of ITransport that uses sockets to communicate with the broker
+ /// </summary>
+ public class TcpTransport : ITransport
+ {
+ protected readonly object myLock = new object();
+ protected readonly Socket socket;
+ private IWireFormat wireformat;
+ private BinaryReader socketReader;
+ private BinaryWriter socketWriter;
+ private Thread readThread;
+ private bool started;
+ private bool disposed = false;
+ private readonly Atomic<bool> closed = new Atomic<bool>(false);
+ private volatile bool seenShutdown;
+ private readonly Uri connectedUri;
+ private int timeout = -1;
+ private int asynctimeout = -1;
+
+ private CommandHandler commandHandler;
+ private ExceptionHandler exceptionHandler;
+ private InterruptedHandler interruptedHandler;
+ private ResumedHandler resumedHandler;
+ private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000);
+
+ /// <summary>
+ /// Size in bytes of the receive buffer.
+ /// </summary>
+ private int receiveBufferSize = 8192;
+ public int ReceiveBufferSize
+ {
+ get { return receiveBufferSize; }
+ set { receiveBufferSize = value; }
+ }
+
+ /// <summary>
+ /// Size in bytes of send buffer.
+ /// </summary>
+ private int sendBufferSize = 8192;
+ public int SendBufferSize
+ {
+ get { return sendBufferSize; }
+ set { sendBufferSize = value; }
+ }
+
+ public TcpTransport(Uri uri, Socket socket, IWireFormat wireformat)
+ {
+ this.connectedUri = uri;
+ this.socket = socket;
+ this.wireformat = wireformat;
+ }
+
+ ~TcpTransport()
+ {
+ Dispose(false);
+ }
+
+ protected virtual Stream CreateSocketStream()
+ {
+ return new NetworkStream(socket);
+ }
+
+ /// <summary>
+ /// Method Start
+ /// </summary>
+ public void Start()
+ {
+ lock(myLock)
+ {
+ if(!started)
+ {
+ if(null == commandHandler)
+ {
+ throw new InvalidOperationException(
+ "command cannot be null when Start is called.");
+ }
+
+ if(null == exceptionHandler)
+ {
+ throw new InvalidOperationException(
+ "exception cannot be null when Start is called.");
+ }
+
+ started = true;
+
+ // Initialize our Read and Writer instances. Its not actually necessary
+ // to have two distinct NetworkStream instances but for now the TcpTransport
+ // will continue to do so for legacy reasons.
+ socketWriter = new EndianBinaryWriter(new BufferedStream(CreateSocketStream(), sendBufferSize));
+ socketReader = new EndianBinaryReader(new BufferedStream(CreateSocketStream(), receiveBufferSize));
+
+ // now lets create the background read thread
+ readThread = new Thread(new ThreadStart(ReadLoop)) { IsBackground = true };
+ readThread.Start();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Property IsStarted
+ /// </summary>
+ public bool IsStarted
+ {
+ get
+ {
+ lock(myLock)
+ {
+ return started;
+ }
+ }
+ }
+
+ public virtual void Oneway(Command command)
+ {
+ lock(myLock)
+ {
+ if(closed.Value)
+ {
+ this.exceptionHandler(this, new InvalidOperationException("Error writing to broker. Transport connection is closed."));
+ return;
+ }
+
+ if(command is DISCONNECT)
+ {
+ seenShutdown = true;
+ }
+
+ WireFormat.Marshal(command, socketWriter);
+ }
+ }
+
+ public FutureResponse AsyncRequest(Command command)
+ {
+ throw new NotImplementedException("Use a ResponseCorrelator if you want to issue AsyncRequest calls");
+ }
+
+ public bool TcpNoDelayEnabled
+ {
+#if !NETCF
+ get { return this.socket.NoDelay; }
+ set { this.socket.NoDelay = value; }
+#else
+ get { return false; }
+ set { }
+#endif
+ }
+
+ public Response Request(Command command)
+ {
+ throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls");
+ }
+
+ public Response Request(Command command, TimeSpan timeout)
+ {
+ throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls");
+ }
+
+ public void Stop()
+ {
+ Close();
+ }
+
+ public void Close()
+ {
+ lock(myLock)
+ {
+ if(closed.CompareAndSet(false, true))
+ {
+ try
+ {
+ socket.Shutdown(SocketShutdown.Both);
+ }
+ catch
+ {
+ }
+
+ try
+ {
+ if(null != socketWriter)
+ {
+ socketWriter.Close();
+ }
+ }
+ catch
+ {
+ }
+ finally
+ {
+ socketWriter = null;
+ }
+
+ try
+ {
+ if(null != socketReader)
+ {
+ socketReader.Close();
+ }
+ }
+ catch
+ {
+ }
+ finally
+ {
+ socketReader = null;
+ }
+
+ try
+ {
+ socket.Close();
+ }
+ catch
+ {
+ }
+
+ if(null != readThread)
+ {
+ if(Thread.CurrentThread != readThread && readThread.IsAlive)
+ {
+ if(!readThread.Join((int) MAX_THREAD_WAIT.TotalMilliseconds))
+ {
+ readThread.Abort();
+ }
+ }
+
+ readThread = null;
+ }
+
+ started = false;
+ }
+ }
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected void Dispose(bool disposing)
+ {
+ Close();
+ disposed = true;
+ }
+
+ public bool IsDisposed
+ {
+ get
+ {
+ return disposed;
+ }
+ }
+
+ public void ReadLoop()
+ {
+ // This is the thread function for the reader thread. This runs continuously
+ // performing a blokcing read on the socket and dispatching all commands
+ // received.
+ //
+ // Exception Handling
+ // ------------------
+ // If an Exception occurs during the reading/marshalling, then the connection
+ // is effectively broken because position cannot be re-established to the next
+ // message. This is reported to the app via the exceptionHandler and the socket
+ // is closed to prevent further communication attempts.
+ //
+ // An exception in the command handler may not be fatal to the transport, so
+ // these are simply reported to the exceptionHandler.
+ //
+ while(!closed.Value)
+ {
+ Command command = null;
+
+ try
+ {
+ command = (Command) WireFormat.Unmarshal(socketReader);
+ }
+ catch(Exception ex)
+ {
+ command = null;
+ if(!closed.Value)
+ {
+ // Close the socket as there's little that can be done with this transport now.
+ Close();
+ if(!seenShutdown)
+ {
+ this.exceptionHandler(this, ex);
+ }
+ }
+
+ break;
+ }
+
+ try
+ {
+ if(command != null)
+ {
+ this.commandHandler(this, command);
+ }
+ }
+ catch(Exception e)
+ {
+ this.exceptionHandler(this, e);
+ }
+ }
+ }
+
+ // Implementation methods
+
+ /// <summary>
+ /// Timeout in milliseconds to wait for sending synchronous messages or commands.
+ /// Set to -1 for infinite timeout.
+ /// </summary>
+ public int Timeout
+ {
+ get { return this.timeout; }
+ set { this.timeout = value; }
+ }
+
+ /// <summary>
+ /// Timeout in milliseconds to wait for sending asynchronous messages or commands.
+ /// Set to -1 for infinite timeout.
+ /// </summary>
+ public int AsyncTimeout
+ {
+ get { return this.asynctimeout; }
+ set { this.asynctimeout = value; }
+ }
+
+ public CommandHandler Command
+ {
+ get { return commandHandler; }
+ set { this.commandHandler = value; }
+ }
+
+ public ExceptionHandler Exception
+ {
+ get { return exceptionHandler; }
+ set { this.exceptionHandler = value; }
+ }
+
+ public InterruptedHandler Interrupted
+ {
+ get { return interruptedHandler; }
+ set { this.interruptedHandler = value; }
+ }
+
+ public ResumedHandler Resumed
+ {
+ get { return resumedHandler; }
+ set { this.resumedHandler = value; }
+ }
+
+ public IWireFormat WireFormat
+ {
+ get { return wireformat; }
+ set { wireformat = value; }
+ }
+
+ public bool IsFaultTolerant
+ {
+ get { return false; }
+ }
+
+ public bool IsConnected
+ {
+ get { return socket.Connected; }
+ }
+
+ public Uri RemoteAddress
+ {
+ get { return connectedUri; }
+ }
+
+ public Object Narrow(Type type)
+ {
+ return this.GetType().Equals(type) ? this : null;
+ }
+
+ public bool IsReconnectSupported
+ {
+ get { return false; }
+ }
+
+ public bool IsUpdateURIsSupported
+ {
+ get { return false; }
+ }
+
+ public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+ {
+ throw new IOException();
+ }
+ }
+}
+
+
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+using System.Net;
+using System.Net.Sockets;
+using Apache.NMS.MQTT.Util;
+using Apache.NMS.MQTT.Protocol;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.MQTT.Transport.Tcp
+{
+ [MQTTTransportFactory("tcp")]
+ public class TcpTransportFactory : ITransportFactory
+ {
+ public TcpTransportFactory()
+ {
+ }
+
+ #region Properties
+
+ private bool useLogging = false;
+ public bool UseLogging
+ {
+ get { return useLogging; }
+ set { useLogging = value; }
+ }
+
+ /// <summary>
+ /// Should the Inactivity Monitor be enabled on this Transport.
+ /// </summary>
+ private bool useInactivityMonitor = true;
+ public bool UseInactivityMonitor
+ {
+ get { return this.useInactivityMonitor; }
+ set { this.useInactivityMonitor = value; }
+ }
+
+ /// <summary>
+ /// Size in bytes of the receive buffer.
+ /// </summary>
+ private int receiveBufferSize = 8192;
+ public int ReceiveBufferSize
+ {
+ get { return receiveBufferSize; }
+ set { receiveBufferSize = value; }
+ }
+
+ /// <summary>
+ /// Size in bytes of send buffer.
+ /// </summary>
+ private int sendBufferSize = 8192;
+ public int SendBufferSize
+ {
+ get { return sendBufferSize; }
+ set { sendBufferSize = value; }
+ }
+
+ /// <summary>
+ /// The time-out value, in milliseconds. The default value is 0, which indicates
+ /// an infinite time-out period. Specifying -1 also indicates an infinite time-out period.
+ /// </summary>
+ private int receiveTimeout = 0;
+ public int ReceiveTimeout
+ {
+ get { return receiveTimeout; }
+ set { receiveTimeout = value; }
+ }
+
+ /// <summary>
+ /// The time-out value, in milliseconds. If you set the property with a value between 1 and 499,
+ /// the value will be changed to 500. The default value is 0, which indicates an infinite
+ /// time-out period. Specifying -1 also indicates an infinite time-out period.
+ /// </summary>
+ private int sendTimeout = 0;
+ public int SendTimeout
+ {
+ get { return sendTimeout; }
+ set { sendTimeout = value; }
+ }
+
+ private int connectTimeout = 30000;
+ public int ConnectTimeout
+ {
+ get { return connectTimeout; }
+ set { this.connectTimeout = value; }
+ }
+
+ #endregion
+
+ #region ITransportFactory Members
+
+ public ITransport CompositeConnect(Uri location)
+ {
+ // Extract query parameters from broker Uri
+ StringDictionary map = URISupport.ParseQuery(location.Query);
+
+ // Set transport. properties on this (the factory)
+ URISupport.SetProperties(this, map, "transport.");
+
+ // See if there is a local address and port specified
+ string localAddress = null;
+ int localPort = -1;
+
+ if(!String.IsNullOrEmpty(location.AbsolutePath) && !location.AbsolutePath.Equals("/"))
+ {
+ int index = location.AbsolutePath.IndexOf(':');
+ try
+ {
+ localPort = Int16.Parse(location.AbsolutePath.Substring(index + 1));
+ localAddress = location.AbsolutePath.Substring(1, index - 1);
+ Tracer.DebugFormat("Binding Socket to {0} on port: {1}", localAddress, localPort);
+ }
+ catch
+ {
+ Tracer.Warn("Invalid Port value on URI for local bind option, ignoring.");
+ }
+ }
+
+ Tracer.Debug("Opening socket to: " + location.Host + " on port: " + location.Port);
+ Socket socket = DoConnect(location.Host, location.Port, localAddress, localPort );
+
+ socket.ReceiveBufferSize = ReceiveBufferSize;
+ socket.SendBufferSize = SendBufferSize;
+ socket.ReceiveTimeout = ReceiveTimeout;
+ socket.SendTimeout = SendTimeout;
+
+ MQTTWireFormat wireformat = new MQTTWireFormat();
+ ITransport transport = DoCreateTransport(location, socket, wireformat);
+
+ wireformat.Transport = transport;
+
+ // TODO
+// if(UseLogging)
+// {
+// transport = new LoggingTransport(transport);
+// }
+//
+// if(UseInactivityMonitor)
+// {
+// transport = new InactivityMonitor(transport);
+// }
+
+ return transport;
+ }
+
+ public ITransport CreateTransport(Uri location)
+ {
+ ITransport transport = CompositeConnect(location);
+
+ transport = new MutexTransport(transport);
+ transport = new ResponseCorrelator(transport);
+
+ return transport;
+ }
+
+ #endregion
+
+ /// <summary>
+ /// Override in a subclass to create the specific type of transport that is
+ /// being implemented.
+ /// </summary>
+ protected virtual ITransport DoCreateTransport(Uri location, Socket socket, IWireFormat wireFormat )
+ {
+ TcpTransport transport = new TcpTransport(location, socket, wireFormat);
+
+ // Apply the buffer sizes to the transport also so that it can buffer above the
+ // TCP level which can eagerly send causing sparse packets.
+ transport.SendBufferSize = SendBufferSize;
+ transport.ReceiveBufferSize = ReceiveBufferSize;
+
+ return transport;
+ }
+
+ // DISCUSSION: Caching host entries may not be the best strategy when using the
+ // failover protocol. The failover protocol needs to be very dynamic when looking
+ // up hostnames at runtime. If old hostname->IP mappings are kept around, this may
+ // lead to runtime failures that could have been avoided by dynamically looking up
+ // the new hostname IP.
+#if CACHE_HOSTENTRIES
+ private static IDictionary<string, IPHostEntry> CachedIPHostEntries = new Dictionary<string, IPHostEntry>();
+ private static readonly object _syncLock = new object();
+#endif
+ public static IPHostEntry GetIPHostEntry(string host)
+ {
+ IPHostEntry ipEntry;
+
+#if CACHE_HOSTENTRIES
+ string hostUpperName = host.ToUpper();
+
+ lock (_syncLock)
+ {
+ if (!CachedIPHostEntries.TryGetValue(hostUpperName, out ipEntry))
+ {
+ try
+ {
+ ipEntry = Dns.GetHostEntry(hostUpperName);
+ CachedIPHostEntries.Add(hostUpperName, ipEntry);
+ }
+ catch
+ {
+ ipEntry = null;
+ }
+ }
+ }
+#else
+ try
+ {
+ ipEntry = Dns.GetHostEntry(host);
+ }
+ catch
+ {
+ ipEntry = null;
+ }
+#endif
+
+ return ipEntry;
+ }
+
+ private Socket TryConnectSocket(IPAddress address, int port, string localAddress, int localPort)
+ {
+ if(null != address)
+ {
+ try
+ {
+ Socket socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
+
+ if(null != socket)
+ {
+ if(!String.IsNullOrEmpty(localAddress))
+ {
+ DoBind(socket, localAddress, localPort);
+ }
+
+ IAsyncResult result = socket.BeginConnect(new IPEndPoint(address, port), null, null);
+ result.AsyncWaitHandle.WaitOne(ConnectTimeout, true);
+ if(!socket.Connected)
+ {
+ socket.Close();
+ }
+ else
+ {
+ return socket;
+ }
+ }
+ }
+ catch
+ {
+ }
+ }
+
+ return null;
+ }
+
+ public static bool TryParseIPAddress(string host, out IPAddress ipaddress)
+ {
+ return IPAddress.TryParse(host, out ipaddress);
+ }
+
+ public static IPAddress GetIPAddress(string hostname, AddressFamily addressFamily)
+ {
+ IPAddress ipaddress = null;
+ IPHostEntry hostEntry = GetIPHostEntry(hostname);
+
+ if(null != hostEntry)
+ {
+ ipaddress = GetIPAddress(hostEntry, addressFamily);
+ }
+
+ return ipaddress;
+ }
+
+ public static IPAddress GetIPAddress(IPHostEntry hostEntry, AddressFamily addressFamily)
+ {
+ if(null != hostEntry)
+ {
+ foreach(IPAddress address in hostEntry.AddressList)
+ {
+ if(address.AddressFamily == addressFamily)
+ {
+ return address;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ protected Socket DoConnect(string host, int port, string localAddress, int localPort)
+ {
+ Socket socket = null;
+ IPAddress ipaddress;
+
+ try
+ {
+ if(TryParseIPAddress(host, out ipaddress))
+ {
+ socket = TryConnectSocket(ipaddress, port, localAddress, localPort);
+ }
+ else
+ {
+ // Looping through the AddressList allows different type of connections to be tried
+ // (IPv6, IPv4 and whatever else may be available).
+ IPHostEntry hostEntry = GetIPHostEntry(host);
+
+ if(null != hostEntry)
+ {
+ // Prefer IPv6 first.
+ ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetworkV6);
+ socket = TryConnectSocket(ipaddress, port, localAddress, localPort);
+ if(null == socket)
+ {
+ // Try IPv4 next.
+ ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetwork);
+ socket = TryConnectSocket(ipaddress, port, localAddress, localPort);
+ if(null == socket)
+ {
+ // Try whatever else there is.
+ foreach(IPAddress address in hostEntry.AddressList)
+ {
+ if(AddressFamily.InterNetworkV6 == address.AddressFamily
+ || AddressFamily.InterNetwork == address.AddressFamily)
+ {
+ // Already tried these protocols.
+ continue;
+ }
+
+ socket = TryConnectSocket(ipaddress, port, localAddress, localPort);
+ if(null != socket)
+ {
+ ipaddress = address;
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if(null == socket)
+ {
+ throw new SocketException();
+ }
+ }
+ catch(Exception ex)
+ {
+ throw new NMSConnectionException(String.Format("Error connecting to {0}:{1}.", host, port), ex);
+ }
+
+ Tracer.DebugFormat("Connected to {0}:{1} using {2} protocol.", host, port, ipaddress.AddressFamily.ToString());
+ return socket;
+ }
+
+ protected void DoBind(Socket socket, string host, int port)
+ {
+ IPAddress ipaddress;
+
+ try
+ {
+ if(TryParseIPAddress(host, out ipaddress))
+ {
+ TryBindSocket(socket, ipaddress, port);
+ }
+ else
+ {
+ // Looping through the AddressList allows different type of connections to be tried
+ // (IPv6, IPv4 and whatever else may be available).
+ IPHostEntry hostEntry = GetIPHostEntry(host);
+
+ if(null != hostEntry)
+ {
+ // Prefer IPv6 first.
+ ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetworkV6);
+ if(!TryBindSocket(socket, ipaddress, port))
+ {
+ // Try IPv4 next.
+ ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetwork);
+ if(!TryBindSocket(socket, ipaddress, port))
+ {
+ // Try whatever else there is.
+ foreach(IPAddress address in hostEntry.AddressList)
+ {
+ if(AddressFamily.InterNetworkV6 == address.AddressFamily
+ || AddressFamily.InterNetwork == address.AddressFamily)
+ {
+ // Already tried these protocols.
+ continue;
+ }
+
+ if(TryBindSocket(socket, ipaddress, port))
+ {
+ ipaddress = address;
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if(!socket.IsBound)
+ {
+ throw new SocketException();
+ }
+ }
+ catch(Exception ex)
+ {
+ throw new NMSConnectionException(String.Format("Error binding to {0}:{1}.", host, port), ex);
+ }
+
+ Tracer.DebugFormat("Bound to {0}:{1} using.", host, port);
+ }
+
+ private bool TryBindSocket(Socket socket, IPAddress address, int port)
+ {
+ if(null != socket && null != address)
+ {
+ try
+ {
+ socket.Bind(new IPEndPoint(address, port));
+ if(socket.IsBound)
+ {
+ return true;
+ }
+ }
+ catch
+ {
+ }
+ }
+
+ return false;
+ }
+
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/TransportFilter.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/TransportFilter.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,267 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT.Transport
+{
+ /// <summary>
+ /// Used to implement a filter on the transport layer.
+ /// </summary>
+ public class TransportFilter : ITransport
+ {
+ protected readonly ITransport next;
+ protected CommandHandler commandHandler;
+ protected ExceptionHandler exceptionHandler;
+ protected InterruptedHandler interruptedHandler;
+ protected ResumedHandler resumedHandler;
+ private bool disposed = false;
+
+ public TransportFilter(ITransport next)
+ {
+ this.next = next;
+ this.next.Command = new CommandHandler(OnCommand);
+ this.next.Exception = new ExceptionHandler(OnException);
+ this.next.Interrupted = new InterruptedHandler(OnInterrupted);
+ this.next.Resumed = new ResumedHandler(OnResumed);
+ }
+
+ ~TransportFilter()
+ {
+ Dispose(false);
+ }
+
+ protected virtual void OnCommand(ITransport sender, Command command)
+ {
+ this.commandHandler(sender, command);
+ }
+
+ protected virtual void OnException(ITransport sender, Exception command)
+ {
+ this.exceptionHandler(sender, command);
+ }
+
+ protected virtual void OnInterrupted(ITransport sender)
+ {
+ if(this.interruptedHandler != null)
+ {
+ this.interruptedHandler(sender);
+ }
+ }
+
+ protected virtual void OnResumed(ITransport sender)
+ {
+ if(this.resumedHandler != null)
+ {
+ this.resumedHandler(sender);
+ }
+ }
+
+ /// <summary>
+ /// Method Oneway
+ /// </summary>
+ /// <param name="command">A Command</param>
+ public virtual void Oneway(Command command)
+ {
+ this.next.Oneway(command);
+ }
+
+ /// <summary>
+ /// Method AsyncRequest
+ /// </summary>
+ /// <returns>A FutureResponse</returns>
+ /// <param name="command">A Command</param>
+ public virtual FutureResponse AsyncRequest(Command command)
+ {
+ return this.next.AsyncRequest(command);
+ }
+
+ /// <summary>
+ /// Method Request
+ /// </summary>
+ /// <returns>A Response</returns>
+ /// <param name="command">A Command</param>
+ public virtual Response Request(Command command)
+ {
+ return Request(command, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite));
+ }
+
+ /// <summary>
+ /// Method Request with time out for Response.
+ /// </summary>
+ /// <returns>A Response</returns>
+ /// <param name="command">A Command</param>
+ /// <param name="timeout">Timeout in milliseconds</param>
+ public virtual Response Request(Command command, TimeSpan timeout)
+ {
+ return this.next.Request(command, timeout);
+ }
+
+ /// <summary>
+ /// Method Start
+ /// </summary>
+ public virtual void Start()
+ {
+ if(commandHandler == null)
+ {
+ throw new InvalidOperationException("command cannot be null when Start is called.");
+ }
+
+ if(exceptionHandler == null)
+ {
+ throw new InvalidOperationException("exception cannot be null when Start is called.");
+ }
+
+ this.next.Start();
+ }
+
+ /// <summary>
+ /// Property IsStarted
+ /// </summary>
+ public bool IsStarted
+ {
+ get { return this.next.IsStarted; }
+ }
+
+ /// <summary>
+ /// Method Dispose
+ /// </summary>
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if(disposing && !disposed)
+ {
+ Tracer.Debug("TransportFilter disposing of next Transport: " +
+ this.next.GetType().Name);
+ this.next.Dispose();
+ }
+ disposed = true;
+ }
+
+ public bool IsDisposed
+ {
+ get
+ {
+ return disposed;
+ }
+ }
+
+ public CommandHandler Command
+ {
+ get { return commandHandler; }
+ set { this.commandHandler = value; }
+ }
+
+ public ExceptionHandler Exception
+ {
+ get { return exceptionHandler; }
+ set { this.exceptionHandler = value; }
+ }
+
+ public InterruptedHandler Interrupted
+ {
+ get { return interruptedHandler; }
+ set { this.interruptedHandler = value; }
+ }
+
+ public ResumedHandler Resumed
+ {
+ get { return resumedHandler; }
+ set { this.resumedHandler = value; }
+ }
+
+ public virtual void Stop()
+ {
+ this.next.Stop();
+ }
+
+ public Object Narrow(Type type)
+ {
+ if (this.GetType().Equals(type))
+ {
+ return this;
+ }
+ else if(this.next != null)
+ {
+ return this.next.Narrow( type );
+ }
+
+ return null;
+ }
+
+ /// <summary>
+ /// Timeout in milliseconds to wait for sending synchronous messages or commands.
+ /// Set to -1 for infinite timeout.
+ /// </summary>
+ public int Timeout
+ {
+ get { return next.Timeout; }
+ set { next.Timeout = value; }
+ }
+
+ /// <summary>
+ /// Timeout in milliseconds to wait for sending asynchronous messages or commands.
+ /// Set to -1 for infinite timeout.
+ /// </summary>
+ public int AsyncTimeout
+ {
+ get { return next.AsyncTimeout; }
+ set { next.AsyncTimeout = value; }
+ }
+
+ public bool IsFaultTolerant
+ {
+ get { return next.IsFaultTolerant; }
+ }
+
+ public bool IsConnected
+ {
+ get { return next.IsConnected; }
+ }
+
+ public Uri RemoteAddress
+ {
+ get { return next.RemoteAddress; }
+ }
+
+ public bool IsReconnectSupported
+ {
+ get { return next.IsReconnectSupported; }
+ }
+
+ public bool IsUpdateURIsSupported
+ {
+ get { return next.IsUpdateURIsSupported; }
+ }
+
+ public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+ {
+ next.UpdateURIs(rebalance, updatedURIs);
+ }
+
+ public IWireFormat WireFormat
+ {
+ get { return next.WireFormat; }
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/TransportFilter.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs?rev=1545423&r1=1545422&r2=1545423&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs Mon Nov 25 22:19:12 2013
@@ -22,6 +22,6 @@ using System.Runtime.InteropServices;
[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2013 Apache Software Foundation")]
[assembly: AssemblyTrademarkAttribute("")]
[assembly: AssemblyCultureAttribute("")]
-[assembly: AssemblyVersionAttribute("1.7.0.3244")]
+[assembly: AssemblyVersionAttribute("1.7.0.3250")]
[assembly: AssemblyInformationalVersionAttribute("1.7.0")]
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj?rev=1545423&r1=1545422&r2=1545423&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj Mon Nov 25 22:19:12 2013
@@ -34,6 +34,7 @@
<Reference Include="Apache.NMS">
<HintPath>lib\Apache.NMS\mono-2.0\Apache.NMS.dll</HintPath>
</Reference>
+ <Reference Include="System.Web" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
@@ -93,6 +94,16 @@
<Compile Include="src\main\csharp\RequestTimedOutException.cs" />
<Compile Include="src\main\csharp\Threads\DefaultThreadPools.cs" />
<Compile Include="src\main\csharp\Threads\PooledTaskRunner.cs" />
+ <Compile Include="src\main\csharp\Transport\Tcp\SslTransport.cs" />
+ <Compile Include="src\main\csharp\Transport\Tcp\SslTransportFactory.cs" />
+ <Compile Include="src\main\csharp\Transport\Tcp\TcpTransport.cs" />
+ <Compile Include="src\main\csharp\Transport\Tcp\TcpTransportFactory.cs" />
+ <Compile Include="src\main\csharp\Transport\IWireFormat.cs" />
+ <Compile Include="src\main\csharp\Transport\TransportFilter.cs" />
+ <Compile Include="src\main\csharp\Protocol\MQTTWireFormat.cs" />
+ <Compile Include="src\main\csharp\Transport\LoggingTransport.cs" />
+ <Compile Include="src\main\csharp\Transport\MutexTransport.cs" />
+ <Compile Include="src\main\csharp\Transport\ResponseCorrelator.cs" />
</ItemGroup>
<ItemGroup>
<Folder Include="keyfile\" />