You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/07/07 00:19:34 UTC

svn commit: r961033 - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk: ./ src/main/csharp/ src/main/csharp/Commands/ src/main/csharp/Protocol/ src/main/csharp/Transport/ src/main/csharp/Transport/Failover/ src/main/csharp/Transport/Tcp/ src/test/cs...

Author: tabish
Date: Tue Jul  6 22:19:34 2010
New Revision: 961033

URL: http://svn.apache.org/viewvc?rev=961033&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-254

Adds a basic FailoverTransport and Inactivity monitor.  Also provides a simple ConnectionStateTracker that can track the Connection and restore all its Consumer subscriptions once a successful failover is completed.

Added:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/KeepAliveInfo.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/IOException.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ICompositeTransport.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs   (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructureTypes.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MapMessageTest.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.sln

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs Tue Jul  6 22:19:34 2010
@@ -16,6 +16,7 @@
  */
 
 using System;
+using Apache.NMS.Stomp.State;
 
 namespace Apache.NMS.Stomp.Commands
 {
@@ -200,6 +201,14 @@ namespace Apache.NMS.Stomp.Commands
             }
         }
 
+        public virtual bool IsKeepAliveInfo
+        {
+            get
+            {
+                return false;
+            }
+        }
+
         public virtual bool ResponseRequired
         {
             get
@@ -212,6 +221,11 @@ namespace Apache.NMS.Stomp.Commands
             }
         }
 
+        public virtual Response visit(ICommandVisitor visitor)
+        {
+            throw new ApplicationException("BaseCommand.Visit() not implemented");
+        }
+
         public override Object Clone()
         {
             // Since we are a derived class use the base's Clone()

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs Tue Jul  6 22:19:34 2010
@@ -16,6 +16,7 @@
  */
 
 using System;
+using Apache.NMS.Stomp.State;
 
 namespace Apache.NMS.Stomp.Commands
 {
@@ -81,6 +82,17 @@ namespace Apache.NMS.Stomp.Commands
             get;
         }
 
+        bool IsKeepAliveInfo
+        {
+            get;
+        }
+
+        bool IsShutdownInfo
+        {
+            get;
+        }
+
+        Response visit(ICommandVisitor visitor);
     }
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructureTypes.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructureTypes.cs?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructureTypes.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructureTypes.cs Tue Jul  6 22:19:34 2010
@@ -49,6 +49,7 @@ namespace Apache.NMS.Stomp.Commands
         public const byte RemoveInfoType = 25;
         public const byte RemoveSubscriptionInfoType = 26;
         public const byte ErrorResponseType = 27;
+        public const byte KeepAliveInfoType = 28;
         
         public const byte DestinationType = 48;
         public const byte TempDestinationType = 49;
@@ -128,6 +129,9 @@ namespace Apache.NMS.Stomp.Commands
             case ErrorResponseType:
                 packetTypeStr = "ErrorResponseType";
                 break;
+            case KeepAliveInfoType:
+                packetTypeStr = "KeepAliveInfoType";
+                break;
             case DestinationType:
                 packetTypeStr = "DestinationType";
                 break;

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/KeepAliveInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/KeepAliveInfo.cs?rev=961033&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/KeepAliveInfo.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/KeepAliveInfo.cs Tue Jul  6 22:19:34 2010
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+
+using Apache.NMS.Stomp.State;
+
+namespace Apache.NMS.Stomp.Commands
+{
+    public class KeepAliveInfo : BaseCommand
+    {
+        ///
+        /// <summery>
+        ///  Get the unique identifier that this object and its own
+        ///  Marshaler share.
+        /// </summery>
+        ///
+        public override byte GetDataStructureType()
+        {
+            return DataStructureTypes.KeepAliveInfoType;
+        }
+
+        ///
+        /// <summery>
+        ///  Returns a string containing the information for this DataStructure
+        ///  such as its type and value of its elements.
+        /// </summery>
+        ///
+        public override string ToString()
+        {
+            return GetType().Name + "[ " + 
+                "commandId = " + this.CommandId + ", " + 
+                "responseRequired = " + this.ResponseRequired + ", " + " ]";
+        }
+
+        ///
+        /// <summery>
+        ///  Return an answer of true to the isKeepAliveInfo() query.
+        /// </summery>
+        ///
+        public override bool IsKeepAliveInfo
+        {
+            get
+            {
+                return true;
+            }
+        }
+
+        ///
+        /// <summery>
+        ///  Allows a Visitor to visit this command and return a response to the
+        ///  command based on the command type being visited.  The command will call
+        ///  the proper processXXX method in the visitor.
+        /// </summery>
+        ///
+        public override Response visit(ICommandVisitor visitor)
+        {
+            return visitor.processKeepAliveInfo( this );
+        }
+
+    };
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/KeepAliveInfo.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/KeepAliveInfo.cs
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/IOException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/IOException.cs?rev=961033&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/IOException.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/IOException.cs Tue Jul  6 22:19:34 2010
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.Stomp.Commands;
+using System.Text;
+using Apache.NMS;
+
+namespace Apache.NMS.Stomp
+{
+
+	/// <summary>
+	/// Exception thrown when an IO error occurs
+	/// </summary>
+	public class IOException : NMSException
+	{
+		public IOException()
+			: base("IO Exception failed with missing exception log")
+		{
+		}
+
+		public IOException(String msg)
+			: base(msg)
+		{
+		}
+
+		public IOException(String msg, Exception inner)
+			: base(msg, inner)
+		{
+		}
+	}
+}
+
+

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/IOException.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/IOException.cs
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs Tue Jul  6 22:19:34 2010
@@ -93,6 +93,10 @@ namespace Apache.NMS.Stomp.Protocol
             {
                 WriteRemoveInfo((RemoveInfo) o, dataOut);
             }
+            else if(o is KeepAliveInfo)
+            {
+                WriteKeepAliveInfo((KeepAliveInfo) o, dataOut);
+            }
             else if(o is Command)
             {
                 Command command = o as Command;
@@ -435,7 +439,7 @@ namespace Apache.NMS.Stomp.Protocol
             }
             else
             {
-            //    frame.SetProperty("transformation", "jms-map-xml");
+                frame.SetProperty("transformation", "jms-xml");
             }
 
             frame.SetProperty("activemq.dispatchAsync", command.DispatchAsync);
@@ -465,6 +469,11 @@ namespace Apache.NMS.Stomp.Protocol
             frame.ToStream(dataOut);
         }
 
+        protected virtual void WriteKeepAliveInfo(KeepAliveInfo command, BinaryWriter dataOut)
+        {
+            dataOut.Write((byte) '\n' );
+        }
+
         protected virtual void WriteRemoveInfo(RemoveInfo command, BinaryWriter dataOut)
         {
             StompFrame frame = new StompFrame("UNSUBSCRIBE");

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=961033&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Tue Jul  6 22:19:34 2010
@@ -0,0 +1,903 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Threading;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.State;
+using Apache.NMS.Stomp.Threads;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp.Transport.Failover
+{
+    /// <summary>
+    /// A Transport that is made reliable by being able to fail over to another
+    /// transport when a transport failure is detected.
+    /// </summary>
+    public class FailoverTransport : ICompositeTransport, IComparable
+    {
+        private static int idCounter = 0;
+        private int id;
+
+        private bool disposed;
+        private bool connected;
+        private List<Uri> uris = new List<Uri>();
+        private CommandHandler commandHandler;
+        private ExceptionHandler exceptionHandler;
+        private InterruptedHandler interruptedHandler;
+        private ResumedHandler resumedHandler;
+
+        private Mutex reconnectMutex = new Mutex();
+        private Mutex sleepMutex = new Mutex();
+        private ConnectionStateTracker stateTracker = new ConnectionStateTracker();
+        private Dictionary<int, Command> requestMap = new Dictionary<int, Command>();
+
+        private Uri connectedTransportURI;
+        private Uri failedConnectTransportURI;
+        private AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);
+        private TaskRunner reconnectTask = null;
+        private bool started;
+
+        private int timeout = -1;
+        private int initialReconnectDelay = 10;
+        private int maxReconnectDelay = 1000 * 30;
+        private int backOffMultiplier = 2;
+        private bool useExponentialBackOff = true;
+        private bool randomize = true;
+        private int maxReconnectAttempts;
+        private int connectFailures;
+        private int reconnectDelay = 10;
+        private Exception connectionFailure;
+        private bool firstConnection = true;
+        private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
+        private volatile Exception failure;
+        private readonly object mutex = new object();
+
+        public FailoverTransport()
+        {
+            id = idCounter++;
+        }
+
+        ~FailoverTransport()
+        {
+            Dispose(false);
+        }
+        
+        #region FailoverTask
+        
+        private class FailoverTask : Task
+        {
+            private FailoverTransport parent;
+
+            public FailoverTask(FailoverTransport p)
+            {
+                parent = p;
+            }
+
+            public bool Iterate()
+            {
+                bool result = false;
+                bool doReconnect = !parent.disposed && parent.connectionFailure == null;
+                try
+                {
+                    if(parent.ConnectedTransport == null && doReconnect)
+                    {
+                        result = parent.DoConnect();
+                    }
+                }
+                finally
+                {
+                }
+
+                return result;
+            }
+        }
+
+        #endregion
+
+        #region Property Accessors
+
+        public CommandHandler Command
+        {
+            get { return commandHandler; }
+            set { commandHandler = value; }
+        }
+
+        public ExceptionHandler Exception
+        {
+            get { return exceptionHandler; }
+            set { exceptionHandler = value; }
+        }
+
+        public InterruptedHandler Interrupted
+        {
+            get { return interruptedHandler; }
+            set { this.interruptedHandler = value; }
+        }
+
+        public ResumedHandler Resumed
+        {
+            get { return resumedHandler; }
+            set { this.resumedHandler = value; }
+        }
+
+        internal Exception Failure
+        {
+            get{ return failure; }
+            set
+            {
+                lock(mutex)
+                {
+                    failure = value;
+                }
+            }
+        }
+
+        public int Timeout
+        {
+            get { return this.timeout; }
+            set { this.timeout = value; }
+        }
+
+        public TimeSpan RequestTimeout
+        {
+            get { return requestTimeout; }
+            set { requestTimeout = value; }
+        }
+        
+        public int InitialReconnectDelay
+        {
+            get { return initialReconnectDelay; }
+            set { initialReconnectDelay = value; }
+        }
+
+        public int MaxReconnectDelay
+        {
+            get { return maxReconnectDelay; }
+            set { maxReconnectDelay = value; }
+        }
+
+        public int ReconnectDelay
+        {
+            get { return reconnectDelay; }
+            set { reconnectDelay = value; }
+        }
+
+        public int ReconnectDelayExponent
+        {
+            get { return backOffMultiplier; }
+            set { backOffMultiplier = value; }
+        }
+
+        public ITransport ConnectedTransport
+        {
+            get { return connectedTransport.Value; }
+            set { connectedTransport.Value = value; }
+        }
+
+        public Uri ConnectedTransportURI
+        {
+            get { return connectedTransportURI; }
+            set { connectedTransportURI = value; }
+        }
+
+        public int MaxReconnectAttempts
+        {
+            get { return maxReconnectAttempts; }
+            set { maxReconnectAttempts = value; }
+        }
+
+        public bool Randomize
+        {
+            get { return randomize; }
+            set { randomize = value; }
+        }
+
+        public bool UseExponentialBackOff
+        {
+            get { return useExponentialBackOff; }
+            set { useExponentialBackOff = value; }
+        }
+
+        #endregion
+        
+        public bool IsFaultTolerant
+        {
+            get { return true; }
+        }
+
+        public bool IsDisposed
+        {
+            get { return disposed; }
+        }
+
+        public bool IsConnected
+        {
+            get { return connected; }
+        }
+
+        public bool IsStarted
+        {
+            get { return started; }
+        }
+                   
+        /// <summary>
+        /// </summary>
+        /// <param name="command"></param>
+        /// <returns>Returns true if the command is one sent when a connection is being closed.</returns>
+        private bool IsShutdownCommand(Command command)
+        {
+            return (command != null && (command.IsShutdownInfo || command is RemoveInfo));
+        }
+        
+        public void OnException(ITransport sender, Exception error)
+        {
+            try
+            {
+                HandleTransportFailure(error);
+            }
+            catch(Exception e)
+            {
+                e.GetType();
+                // What to do here?
+            }
+        }
+
+        public void disposedOnCommand(ITransport sender, Command c)
+        {
+        }
+
+        public void disposedOnException(ITransport sender, Exception e)
+        {
+        }
+
+        public void HandleTransportFailure(Exception e)
+        {
+            ITransport transport = connectedTransport.GetAndSet(null);
+            if(transport != null)
+            {
+                transport.Command = new CommandHandler(disposedOnCommand);
+                transport.Exception = new ExceptionHandler(disposedOnException);
+                try
+                {
+                    transport.Stop();
+                }
+                catch(Exception ex)
+                {
+                    ex.GetType();	// Ignore errors but this lets us see the error during debugging
+                }
+
+                lock(reconnectMutex)
+                {
+                    bool reconnectOk = false;
+                    if(started)
+                    {
+                        Tracer.WarnFormat("Transport failed to {0}, attempting to automatically reconnect due to: {1}", ConnectedTransportURI.ToString(), e.Message);
+                        reconnectOk = true;
+                    }
+
+                    failedConnectTransportURI = ConnectedTransportURI;
+                    ConnectedTransportURI = null;
+                    connected = false;
+                    if(reconnectOk)
+                    {
+                        reconnectTask.Wakeup();
+                    }
+                }
+
+                if(this.Interrupted != null)
+                {
+                    this.Interrupted(transport);
+                }
+            }
+        }
+
+        public void Start()
+        {
+            lock(reconnectMutex)
+            {
+                if(started)
+                {
+                    Tracer.Debug("FailoverTransport Already Started.");
+                    return;
+                }
+
+                Tracer.Debug("FailoverTransport Started.");
+                started = true;
+                if(ConnectedTransport != null)
+                {
+                    stateTracker.DoRestore(ConnectedTransport);
+                }
+                else
+                {
+                    Reconnect();
+                }
+            }
+        }
+
+        public virtual void Stop()
+        {
+            ITransport transportToStop = null;
+
+            lock(reconnectMutex)
+            {
+                if(!started)
+                {
+                    Tracer.Debug("FailoverTransport Already Stopped.");
+                    return;
+                }
+
+                Tracer.Debug("FailoverTransport Stopped.");
+                started = false;
+                disposed = true;
+                connected = false;
+
+                if(ConnectedTransport != null)
+                {
+                    transportToStop = connectedTransport.GetAndSet(null);
+                }
+            }
+
+            try
+            {
+                sleepMutex.WaitOne();
+            }
+            finally
+            {
+                sleepMutex.ReleaseMutex();
+            }
+
+            if(reconnectTask != null)
+            {
+                reconnectTask.Shutdown();
+            }
+
+            if(transportToStop != null)
+            {
+                transportToStop.Stop();
+            }
+        }
+
+        public FutureResponse AsyncRequest(Command command)
+        {
+            throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");
+        }
+
+        public Response Request(Command command)
+        {
+            throw new ApplicationException("FailoverTransport does not implement Request(Command)");
+        }
+
+        public Response Request(Command command, TimeSpan ts)
+        {
+            throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");
+        }
+        
+        public void OnCommand(ITransport sender, Command command)
+        {
+            if(command != null)
+            {
+                if(command.IsResponse)
+                {
+                    Object oo = null;
+                    lock(((ICollection) requestMap).SyncRoot)
+                    {
+                        int v = ((Response) command).CorrelationId;
+                        try
+                        {
+                            if(requestMap.ContainsKey(v))
+                            {
+                                oo = requestMap[v];
+                                requestMap.Remove(v);
+                            }
+                        }
+                        catch
+                        {
+                        }
+                    }
+
+                    Tracked t = oo as Tracked;
+                    if(t != null)
+                    {
+                        t.onResponses();
+                    }
+                }
+            }
+
+            this.Command(sender, command);
+        }
+
+        public void Oneway(Command command)
+        {
+            Exception error = null;
+            
+            lock(reconnectMutex)
+            {
+                if(IsShutdownCommand(command) && ConnectedTransport == null)
+                {
+                    if(command.IsShutdownInfo)
+                    {
+                        // Skipping send of ShutdownInfo command when not connected.
+                        return;
+                    }
+
+                    if(command is RemoveInfo)
+                    {
+                        // Simulate response to RemoveInfo command
+                        Response response = new Response();
+                        response.CorrelationId = command.CommandId;
+                        OnCommand(this, response);
+                        return;
+                    }
+                }
+                
+                // Keep trying until the message is sent.
+                for(int i = 0; !disposed; i++)
+                {
+                    try
+                    {
+                        // Wait for transport to be connected.
+                        ITransport transport = ConnectedTransport;
+                        DateTime start = DateTime.Now;                            
+                        bool timedout = false;
+                        while(transport == null && !disposed && connectionFailure == null)
+                        {
+                            Tracer.Info("Waiting for transport to reconnect.");
+
+                            int elapsed = (int)(DateTime.Now - start).TotalMilliseconds;
+                            if( this.timeout > 0 && elapsed > timeout )
+                            {
+                                timedout = true;
+                                Tracer.DebugFormat("FailoverTransport.oneway - timed out after {0} mills", elapsed );
+                                break;
+                            }
+                            
+                            // Release so that the reconnect task can run
+                            try
+                            {
+                                // Wait for something
+                                Monitor.Wait(reconnectMutex, 1000);
+                            }
+                            catch(ThreadInterruptedException e)
+                            {
+                                Tracer.DebugFormat("Interrupted: {0}", e.Message);
+                            }                            
+
+                            transport = ConnectedTransport;
+                        }
+
+                        if(transport == null)
+                        {
+                            // Previous loop may have exited due to use being disposed.
+                            if(disposed)
+                            {
+                                error = new IOException("Transport disposed.");
+                            }
+                            else if(connectionFailure != null)
+                            {
+                                error = connectionFailure;
+                            }
+                            else if(timedout)
+                            {
+                                error = new IOException("Failover oneway timed out after "+ timeout +" milliseconds.");
+                            }
+                            else
+                            {
+                                error = new IOException("Unexpected failure.");
+                            }
+                            break;
+                        }
+
+                        // If it was a request and it was not being tracked by
+                        // the state tracker, then hold it in the requestMap so
+                        // that we can replay it later.
+                        Tracked tracked = stateTracker.track(command);
+                        lock(((ICollection) requestMap).SyncRoot)
+                        {
+                            if(tracked != null && tracked.WaitingForResponse)
+                            {
+                                requestMap.Add(command.CommandId, tracked);
+                            }
+                            else if(tracked == null && command.ResponseRequired)
+                            {
+                                requestMap.Add(command.CommandId, command);
+                            }
+                        }
+
+                        // Send the message.
+                        try
+                        {
+                            transport.Oneway(command);
+                            stateTracker.trackBack(command);
+                        }
+                        catch(Exception e)
+                        {
+                            // If the command was not tracked.. we will retry in
+                            // this method
+                            if(tracked == null)
+                            {
+
+                                // since we will retry in this method.. take it
+                                // out of the request map so that it is not
+                                // sent 2 times on recovery
+                                if(command.ResponseRequired)
+                                {
+                                    lock(((ICollection) requestMap).SyncRoot)
+                                    {
+                                        requestMap.Remove(command.CommandId);
+                                    }
+                                }
+
+                                // Rethrow the exception so it will handled by
+                                // the outer catch
+                                throw e;
+                            }
+
+                        }
+
+                        return;
+
+                    }
+                    catch(Exception e)
+                    {
+                        Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}", i, e.Message);
+                        Tracer.DebugFormat("Failed Message Was: {0}", command);
+                        HandleTransportFailure(e);
+                    }
+                }
+            }
+
+            if(!disposed)
+            {
+                if(error != null)
+                {
+                    throw error;
+                }
+            }
+        }
+
+        public void Add(Uri[] u)
+        {
+            lock(uris)
+            {
+                for(int i = 0; i < u.Length; i++)
+                {
+                    if(!uris.Contains(u[i]))
+                    {
+                        uris.Add(u[i]);
+                    }
+                }
+            }
+
+            Reconnect();
+        }
+
+        public void Remove(Uri[] u)
+        {
+            lock(uris)
+            {
+                for(int i = 0; i < u.Length; i++)
+                {
+                    uris.Remove(u[i]);
+                }
+            }
+
+            Reconnect();
+        }
+
+        public void Add(String u)
+        {
+            try
+            {
+                Uri uri = new Uri(u);
+                lock(uris)
+                {
+                    if(!uris.Contains(uri))
+                    {
+                        uris.Add(uri);
+                    }
+                }
+
+                Reconnect();
+            }
+            catch(Exception e)
+            {
+                Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
+            }
+        }
+
+        public void Reconnect(Uri uri)
+        {
+            Add(new Uri[] { uri });
+        }
+        
+        public void Reconnect()
+        {
+            lock(reconnectMutex)
+            {
+                if(started)
+                {
+                    if(reconnectTask == null)
+                    {
+                        Tracer.Debug("Creating reconnect task");
+                        reconnectTask = new DedicatedTaskRunner(new FailoverTask(this));
+                    }
+
+                    Tracer.Debug("Waking up reconnect task");
+                    try
+                    {
+                        reconnectTask.Wakeup();
+                    }
+                    catch(ThreadInterruptedException)
+                    {
+                    }
+                }
+                else
+                {
+                    Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
+                }
+            }
+        }
+
+        private List<Uri> ConnectList
+        {
+            get
+            {
+                List<Uri> l = new List<Uri>(uris);
+                bool removed = false;
+                if(failedConnectTransportURI != null)
+                {
+                    removed = l.Remove(failedConnectTransportURI);
+                }
+
+                if(Randomize)
+                {
+                    // Randomly, reorder the list by random swapping
+                    Random r = new Random(DateTime.Now.Millisecond);
+                    for(int i = 0; i < l.Count; i++)
+                    {
+                        int p = r.Next(l.Count);
+                        Uri t = l[p];
+                        l[p] = l[i];
+                        l[i] = t;
+                    }
+                }
+
+                if(removed)
+                {
+                    l.Add(failedConnectTransportURI);
+                }
+
+                return l;
+            }
+        }
+
+        protected void RestoreTransport(ITransport t)
+        {
+            Tracer.Info("Restoring previous transport connection.");
+            t.Start();
+
+            stateTracker.DoRestore(t);
+
+            Tracer.Info("Sending queued commands...");
+            Dictionary<int, Command> tmpMap = null;
+            lock(((ICollection) requestMap).SyncRoot)
+            {
+                tmpMap = new Dictionary<int, Command>(requestMap);
+            }
+
+            foreach(Command command in tmpMap.Values)
+            {
+                t.Oneway(command);
+            }
+        }
+        
+        public Uri RemoteAddress
+        {
+            get
+            {
+                if(ConnectedTransport != null)
+                {
+                    return ConnectedTransport.RemoteAddress;
+                }
+                return null;
+            }
+        }
+
+        public Object Narrow(Type type)
+        {
+            if(this.GetType().Equals(type))
+            {
+                return this;
+            }
+            else if(ConnectedTransport != null)
+            {
+                return ConnectedTransport.Narrow(type);
+            }
+
+            return null;
+        }
+
+        private bool DoConnect()
+        {
+            lock(reconnectMutex)
+            {
+                if(ConnectedTransport != null || disposed || connectionFailure != null)
+                {
+                    return false;
+                }
+                else
+                {
+                    List<Uri> connectList = ConnectList;
+                    if(connectList.Count == 0)
+                    {
+                        Failure = new NMSConnectionException("No URIs available for connection.");
+                    }
+                    else
+                    {
+                        if(!UseExponentialBackOff)
+                        {
+                            ReconnectDelay = InitialReconnectDelay;
+                        }
+
+                        ITransport transport = null;
+                        Uri chosenUri = null;
+
+                        try
+                        {
+                            foreach(Uri uri in connectList)
+                            {
+                                if(ConnectedTransport != null || disposed)
+                                {
+                                    break;
+                                }
+
+                                Tracer.DebugFormat("Attempting connect to: {0}", uri);
+
+                                // synchronous connect
+                                try
+                                {
+                                    Tracer.DebugFormat("Attempting connect to: {0}", uri.ToString());
+                                    transport = TransportFactory.CompositeConnect(uri);
+                                    chosenUri = transport.RemoteAddress;
+                                    break;
+                                }
+                                catch(Exception e)
+                                {
+                                    Failure = e;
+                                    Tracer.DebugFormat("Connect fail to: {0}, reason: {1}", uri, e.Message);
+                                }
+                            }
+
+                            if(transport != null)
+                            {
+                                transport.Command = new CommandHandler(OnCommand);
+                                transport.Exception = new ExceptionHandler(OnException);
+                                transport.Start();
+
+                                if(started)
+                                {
+                                    RestoreTransport(transport);
+                                }
+
+                                if(this.Resumed != null)
+                                {
+                                    this.Resumed(transport);
+                                }
+
+                                Tracer.Debug("Connection established");
+                                ReconnectDelay = InitialReconnectDelay;
+                                ConnectedTransportURI = chosenUri;
+                                ConnectedTransport = transport;
+                                connectFailures = 0;
+                                connected = true;
+
+                                if(firstConnection)
+                                {
+                                    firstConnection = false;
+                                    Tracer.InfoFormat("Successfully connected to: {0}", chosenUri.ToString());
+                                }
+                                else
+                                {
+                                    Tracer.InfoFormat("Successfully reconnected to: {0}", chosenUri.ToString());
+                                }
+
+                                return false;
+                            }
+                        }
+                        catch(Exception e)
+                        {
+                            Failure = e;
+                            Tracer.DebugFormat("Connect attempt failed.  Reason: {0}", e.Message);
+                        }
+                    }
+
+                    if(MaxReconnectAttempts > 0 && ++connectFailures >= MaxReconnectAttempts)
+                    {
+                        Tracer.ErrorFormat("Failed to connect to transport after {0} attempt(s)", connectFailures);
+                        connectionFailure = Failure;
+                        this.Exception(this, connectionFailure);
+                        return false;
+                    }
+                }
+            }
+
+            if(!disposed)
+            {
+                Tracer.DebugFormat("Waiting {0}ms before attempting connection.", ReconnectDelay);
+                lock(sleepMutex)
+                {
+                    try
+                    {
+                        Thread.Sleep(ReconnectDelay);
+                    }
+                    catch(ThreadInterruptedException)
+                    {
+                    }
+                }
+
+                if(UseExponentialBackOff)
+                {
+                    // Exponential increment of reconnect delay.
+                    ReconnectDelay *= ReconnectDelayExponent;
+                    if(ReconnectDelay > MaxReconnectDelay)
+                    {
+                        ReconnectDelay = MaxReconnectDelay;
+                    }
+                }
+            }
+            return !disposed;
+        }
+
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        public void Dispose(bool disposing)
+        {
+            if(disposing)
+            {
+                // get rid of unmanaged stuff
+            }
+
+            disposed = true;
+        }
+
+        public int CompareTo(Object o)
+        {
+            if(o is FailoverTransport)
+            {
+                FailoverTransport oo = o as FailoverTransport;
+
+                return this.id - oo.id;
+            }
+            else
+            {
+                throw new ArgumentException();
+            }
+        }
+
+        public override String ToString()
+        {
+            return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString();
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs?rev=961033&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs Tue Jul  6 22:19:34 2010
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp.Transport.Failover
+{
+	public class FailoverTransportFactory : ITransportFactory
+	{
+		private ITransport doConnect(Uri location)
+		{
+			ITransport transport = CreateTransport(URISupport.parseComposite(location));
+			transport = new MutexTransport(transport);
+			transport = new ResponseCorrelator(transport);
+			return transport;
+		}
+
+		public ITransport CompositeConnect(Uri location)
+		{
+			return CreateTransport(URISupport.parseComposite(location));
+		}
+
+		public ITransport CompositeConnect(Uri location, SetTransport setTransport)
+		{
+			throw new NMSConnectionException("Asynchronous composite connection not supported with Failover transport.");
+		}
+
+		public ITransport CreateTransport(Uri location)
+		{
+			return doConnect(location);
+		}
+
+		/// <summary>
+		/// </summary>
+		/// <param name="compositData"></param>
+		/// <returns></returns>
+		public ITransport CreateTransport(URISupport.CompositeData compositData)
+		{
+			StringDictionary options = compositData.Parameters;
+			FailoverTransport transport = CreateTransport(options);
+			transport.Add(compositData.Components);
+			return transport;
+		}
+
+		public FailoverTransport CreateTransport(StringDictionary parameters)
+		{
+			FailoverTransport transport = new FailoverTransport();
+			URISupport.SetProperties(transport, parameters, "");
+			return transport;
+		}
+	}
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ICompositeTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ICompositeTransport.cs?rev=961033&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ICompositeTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ICompositeTransport.cs Tue Jul  6 22:19:34 2010
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Stomp.Transport
+{
+	public interface ICompositeTransport : ITransport
+	{
+		void Add(Uri[] uris);
+		void Remove(Uri[] uris);
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ICompositeTransport.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ICompositeTransport.cs
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs?rev=961033&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs Tue Jul  6 22:19:34 2010
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Threads;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp.Transport
+{
+    /// <summary>
+    /// This class make sure that the connection is still alive,
+    /// by monitoring the reception of commands from the peer of
+    /// the transport.
+    /// </summary>
+    public class InactivityMonitor : TransportFilter
+    {
+        private Atomic<bool> monitorStarted = new Atomic<bool>(false);
+
+        private Atomic<bool> commandSent = new Atomic<bool>(false);
+        private Atomic<bool> commandReceived = new Atomic<bool>(false);
+
+        private Atomic<bool> failed = new Atomic<bool>(false);
+        private Atomic<bool> inRead = new Atomic<bool>(false);
+        private Atomic<bool> inWrite = new Atomic<bool>(false);
+
+        private DedicatedTaskRunner asyncTask;
+        private AsyncWriteTask asyncWriteTask;
+
+        private readonly Mutex monitor = new Mutex();
+
+        private Timer connectionCheckTimer;
+
+        private long maxInactivityDuration = 10000;
+        public long MaxInactivityDuration
+        {
+            get { return this.maxInactivityDuration; }
+            set { this.maxInactivityDuration = value; }
+        }
+
+        private long maxInactivityDurationInitialDelay = 10000;
+        public long MaxInactivityDurationInitialDelay
+        {
+            get { return this.maxInactivityDurationInitialDelay; }
+            set { this.maxInactivityDurationInitialDelay = value; }
+        }
+
+        /// <summary>
+        /// Constructor or the Inactivity Monitor
+        /// </summary>
+        /// <param name="next"></param>
+        public InactivityMonitor(ITransport next)
+            : base(next)
+        {
+            Tracer.Debug("Creating Inactivity Monitor");
+        }
+
+        ~InactivityMonitor()
+        {
+            Dispose(false);
+        }
+
+        protected override void Dispose(bool disposing)
+        {
+            if(disposing)
+            {
+                // get rid of unmanaged stuff
+            }
+
+            StopMonitorThreads();
+
+            base.Dispose(disposing);
+        }
+
+        #region WriteCheck Related
+        /// <summary>
+        /// Check the write to the broker
+        /// </summary>
+        public void WriteCheck(object unused)
+        {
+            if(this.inWrite.Value || this.failed.Value)
+            {
+                Tracer.Debug("Inactivity Monitor is in write or already failed.");				
+                return;
+            }
+
+            if(!commandSent.Value)
+            {
+                Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
+                this.asyncTask.Wakeup();
+            }
+            else
+            {
+                Tracer.Debug("Message sent since last write check. Resetting flag");
+            }
+
+            commandSent.Value = false;
+        }
+        #endregion
+
+        public override void Stop()
+        {
+            StopMonitorThreads();
+            next.Stop();
+        }
+
+        protected override void OnCommand(ITransport sender, Command command)
+        {
+            commandReceived.Value = true;
+            inRead.Value = true;
+            try
+            {
+                try
+                {
+                    StartMonitorThreads();
+                }
+                catch(IOException ex)
+                {
+                    OnException(this, ex);
+                }
+
+                base.OnCommand(sender, command);
+            }
+            finally
+            {
+                inRead.Value = false;
+            }
+        }
+
+        public override void Oneway(Command command)
+        {
+            // Disable inactivity monitoring while processing a command.
+            // synchronize this method - its not synchronized
+            // further down the transport stack and gets called by more
+            // than one thread  by this class
+            lock(inWrite)
+            {
+                inWrite.Value = true;
+                try
+                {
+                    if(failed.Value)
+                    {
+                        throw new IOException("Channel was inactive for too long: " + next.RemoteAddress.ToString());
+                    }
+
+                    next.Oneway(command);
+                }
+                finally
+                {
+                    commandSent.Value = true;
+                    inWrite.Value = false;
+                }
+            }
+        }
+
+        protected override void OnException(ITransport sender, Exception command)
+        {
+            if(failed.CompareAndSet(false, true))
+            {
+                Tracer.Debug("Exception received in the Inactivity Monitor: " + command.ToString());
+                StopMonitorThreads();
+                base.OnException(sender, command);
+            }
+        }
+
+        private void StartMonitorThreads()
+        {
+            lock(monitor)
+            {
+                if(monitorStarted.Value || maxInactivityDuration == 0)
+                {
+                    return;
+                }
+
+                Tracer.DebugFormat("Inactivity: Write Check time interval: {0}", maxInactivityDuration );
+				Tracer.DebugFormat("Inactivity: Initial Delay time interval: {0}", maxInactivityDurationInitialDelay );
+
+                this.asyncWriteTask = new AsyncWriteTask(this);
+                this.asyncTask = new DedicatedTaskRunner(this.asyncWriteTask);
+
+                monitorStarted.Value = true;
+
+                this.connectionCheckTimer = new Timer(
+                    new TimerCallback(WriteCheck),
+                    null,
+                    maxInactivityDurationInitialDelay,
+                    maxInactivityDuration
+                    );
+            }
+        }
+
+        private void StopMonitorThreads()
+        {
+            lock(monitor)
+            {
+                if(monitorStarted.CompareAndSet(true, false))
+                {
+                    AutoResetEvent shutdownEvent = new AutoResetEvent(false);
+
+                    // Attempt to wait for the Timer to shutdown, but don't wait
+                    // forever, if they don't shutdown after two seconds, just quit.
+                    this.connectionCheckTimer.Dispose(shutdownEvent);
+                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000), false);
+
+                    this.asyncTask.Shutdown();
+                    this.asyncTask = null;
+                    this.asyncWriteTask = null;
+                }
+            }
+        }
+
+        #region Async Tasks
+
+        // Task that fires when the TaskRunner is signaled by the WriteCheck Timer Task.
+        class AsyncWriteTask : Task
+        {
+            private InactivityMonitor parent;
+
+            public AsyncWriteTask(InactivityMonitor parent)
+            {
+                this.parent = parent;
+            }
+
+            public bool Iterate()
+            {
+				Tracer.Debug("AsyncWriteTask perparing for another Write Check");
+                if(this.parent.monitorStarted.Value)
+                {
+                    try
+                    {
+						Tracer.Debug("AsyncWriteTask Write Check required sending KeepAlive.");
+                        KeepAliveInfo info = new KeepAliveInfo();
+                        this.parent.Oneway(info);
+                    }
+                    catch(IOException e)
+                    {
+                        this.parent.OnException(parent, e);
+                    }
+                }
+
+                return false;
+            }
+        }
+        #endregion
+    }
+
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Tue Jul  6 22:19:34 2010
@@ -39,6 +39,16 @@ namespace Apache.NMS.Stomp.Transport.Tcp
 			set { useLogging = value; }
 		}
 
+        /// <summary>
+        /// Should the Inactivity Monitor be enabled on this Transport.
+        /// </summary>
+        private bool useInactivityMonitor = true;
+        public bool UseInactivityMonitor
+        {
+           get { return this.useInactivityMonitor; }
+           set { this.useInactivityMonitor = value; }
+        }
+
 		/// <summary>
 		/// Size in bytes of the receive buffer.
 		/// </summary>
@@ -128,6 +138,11 @@ namespace Apache.NMS.Stomp.Transport.Tcp
 				transport = new LoggingTransport(transport);
 			}
 
+            if(UseInactivityMonitor)
+            {
+               transport = new InactivityMonitor(transport);
+            }
+            
 			transport.RequestTimeout = this.requestTimeout;
 
 			if(setTransport != null)

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs Tue Jul  6 22:19:34 2010
@@ -18,6 +18,7 @@
 using System;
 
 using Apache.NMS.Stomp.Transport.Tcp;
+using Apache.NMS.Stomp.Transport.Failover;
 
 namespace Apache.NMS.Stomp.Transport
 {
@@ -77,6 +78,9 @@ namespace Apache.NMS.Stomp.Transport
             {
                 switch(scheme.ToLower())
                 {
+                case "failover":
+                    factory = new FailoverTransportFactory();
+                    break;
                 case "tcp":
                     factory = new TcpTransportFactory();
                     break;

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MapMessageTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MapMessageTest.cs?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MapMessageTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MapMessageTest.cs Tue Jul  6 22:19:34 2010
@@ -56,7 +56,8 @@ namespace Apache.NMS.Stomp.Test
                 connection.Start();
                 using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
                 {
-                    string TOPIC = DESTINATION_NAME + "?consumer.transformation=jms-map-xml";
+                    string TOPIC = DESTINATION_NAME + "?consumer.transformation=jms-xml";
+//                    string TOPIC = DESTINATION_NAME;
                     IDestination destination = session.GetTopic(TOPIC);
                     using(IMessageConsumer consumer = session.CreateConsumer(destination))
                     using(IMessageProducer producer = session.CreateProducer(destination))

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj Tue Jul  6 22:19:34 2010
@@ -60,14 +60,15 @@
     <Reference Include="nunit.framework.extensions, Version=2.4.8.0, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77, processorArchitecture=MSIL">
       <SpecificVersion>False</SpecificVersion>
       <HintPath>lib\NUnit\net-2.0\nunit.framework.extensions.dll</HintPath>
+      <Package>mono-nunit</Package>
     </Reference>
     <Reference Include="System" />
     <Reference Include="System.Xml" />
-    <Reference Include="Apache.NMS, Version=1.3.0.1946, Culture=neutral, PublicKeyToken=82756feee3957618">
+    <Reference Include="Apache.NMS, Version=1.4.0.2007, Culture=neutral, PublicKeyToken=82756feee3957618">
       <SpecificVersion>False</SpecificVersion>
       <HintPath>build\mono-2.0\debug\Apache.NMS.dll</HintPath>
     </Reference>
-    <Reference Include="Apache.NMS.Stomp, Version=1.3.0.1949, Culture=neutral, PublicKeyToken=82756feee3957618">
+    <Reference Include="Apache.NMS.Stomp, Version=1.4.0.2010, Culture=neutral, PublicKeyToken=82756feee3957618">
       <SpecificVersion>False</SpecificVersion>
       <HintPath>build\mono-2.0\debug\Apache.NMS.Stomp.dll</HintPath>
     </Reference>
@@ -97,7 +98,7 @@
       <Properties>
         <Policies>
           <TextStylePolicy FileWidth="120" RemoveTrailingWhitespace="True" inheritsSet="VisualStudio" inheritsScope="text/plain" />
-          <StandardHeader Text="/*&#xA; * Licensed to the Apache Software Foundation (ASF) under one or more&#xA; * contributor license agreements.  See the NOTICE file distributed with&#xA; * this work for additional information regarding copyright ownership.&#xA; * The ASF licenses this file to You under the Apache License, Version 2.0&#xA; * (the &quot;License&quot;); you may not use this file except in compliance with&#xA; * the License.  You may obtain a copy of the License at&#xA; *&#xA; *     http://www.apache.org/licenses/LICENSE-2.0&#xA; *&#xA; * Unless required by applicable law or agreed to in writing, software&#xA; * distributed under the License is distributed on an &quot;AS IS&quot; BASIS,&#xA; * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.&#xA; * See the License for the specific language governing permissions and&#xA; * limitations under the License.&#xA; */&#xA;" inheritsSet="MITX11License" />
+          <StandardHeader Text="/*&#xA; * Licensed to the Apache Software Foundation (ASF) under one or more&#xA; * contributor license agreements.  See the NOTICE file distributed with&#xA; * this work for additional information regarding copyright ownership.&#xA; * The ASF licenses this file to You under the Apache License, Version 2.0&#xA; * (the &quot;License&quot;); you may not use this file except in compliance with&#xA; * the License.  You may obtain a copy of the License at&#xA; *&#xA; *     http://www.apache.org/licenses/LICENSE-2.0&#xA; *&#xA; * Unless required by applicable law or agreed to in writing, software&#xA; * distributed under the License is distributed on an &quot;AS IS&quot; BASIS,&#xA; * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.&#xA; * See the License for the specific language governing permissions and&#xA; * limitations under the License.&#xA; */&#xA;" inheritsSet="Apache2License" />
         </Policies>
       </Properties>
     </MonoDevelop>

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj Tue Jul  6 22:19:34 2010
@@ -55,7 +55,7 @@
   <ItemGroup>
     <Reference Include="System" />
     <Reference Include="System.Xml" />
-    <Reference Include="Apache.NMS, Version=1.3.0.1893, Culture=neutral, PublicKeyToken=82756feee3957618">
+    <Reference Include="Apache.NMS, Version=1.4.0.2007, Culture=neutral, PublicKeyToken=82756feee3957618">
       <SpecificVersion>False</SpecificVersion>
       <HintPath>build\mono-2.0\debug\Apache.NMS.dll</HintPath>
     </Reference>
@@ -146,6 +146,20 @@
     <Compile Include="src\main\csharp\Protocol\XmlPrimitiveMapMarshaler.cs" />
     <Compile Include="src\main\csharp\Transport\Tcp\SslTransport.cs" />
     <Compile Include="src\main\csharp\Transport\Tcp\SslTransportFactory.cs" />
+    <Compile Include="src\main\csharp\Transport\Failover\FailoverTransport.cs" />
+    <Compile Include="src\main\csharp\Transport\Failover\FailoverTransportFactory.cs" />
+    <Compile Include="src\main\csharp\Transport\InactivityMonitor.cs" />
+    <Compile Include="src\main\csharp\Commands\KeepAliveInfo.cs" />
+    <Compile Include="src\main\csharp\State\CommandVisitorAdapter.cs" />
+    <Compile Include="src\main\csharp\State\ConnectionState.cs" />
+    <Compile Include="src\main\csharp\State\ConnectionStateTracker.cs" />
+    <Compile Include="src\main\csharp\State\ConsumerState.cs" />
+    <Compile Include="src\main\csharp\State\ICommandVisitor.cs" />
+    <Compile Include="src\main\csharp\State\ThreadSimulator.cs" />
+    <Compile Include="src\main\csharp\State\Tracked.cs" />
+    <Compile Include="src\main\csharp\Transport\ICompositeTransport.cs" />
+    <Compile Include="src\main\csharp\State\SynchronizedObjects.cs" />
+    <Compile Include="src\main\csharp\IOException.cs" />
   </ItemGroup>
   <ItemGroup>
     <None Include="keyfile\NMSKey.snk" />
@@ -157,10 +171,13 @@
       <Properties>
         <Policies>
           <TextStylePolicy FileWidth="120" RemoveTrailingWhitespace="True" inheritsSet="VisualStudio" inheritsScope="text/plain" />
-          <StandardHeader Text="/*&#xA; * Licensed to the Apache Software Foundation (ASF) under one or more&#xA; * contributor license agreements.  See the NOTICE file distributed with&#xA; * this work for additional information regarding copyright ownership.&#xA; * The ASF licenses this file to You under the Apache License, Version 2.0&#xA; * (the &quot;License&quot;); you may not use this file except in compliance with&#xA; * the License.  You may obtain a copy of the License at&#xA; *&#xA; *     http://www.apache.org/licenses/LICENSE-2.0&#xA; *&#xA; * Unless required by applicable law or agreed to in writing, software&#xA; * distributed under the License is distributed on an &quot;AS IS&quot; BASIS,&#xA; * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.&#xA; * See the License for the specific language governing permissions and&#xA; * limitations under the License.&#xA; */&#xA;" inheritsSet="MITX11License" />
+          <StandardHeader Text="/*&#xA; * Licensed to the Apache Software Foundation (ASF) under one or more&#xA; * contributor license agreements.  See the NOTICE file distributed with&#xA; * this work for additional information regarding copyright ownership.&#xA; * The ASF licenses this file to You under the Apache License, Version 2.0&#xA; * (the &quot;License&quot;); you may not use this file except in compliance with&#xA; * the License.  You may obtain a copy of the License at&#xA; *&#xA; *     http://www.apache.org/licenses/LICENSE-2.0&#xA; *&#xA; * Unless required by applicable law or agreed to in writing, software&#xA; * distributed under the License is distributed on an &quot;AS IS&quot; BASIS,&#xA; * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.&#xA; * See the License for the specific language governing permissions and&#xA; * limitations under the License.&#xA; */&#xA;" inheritsSet="Apache2License" />
         </Policies>
       </Properties>
     </MonoDevelop>
     <VisualStudio />
   </ProjectExtensions>
+  <ItemGroup>
+    <Folder Include="src\main\csharp\State\" />
+  </ItemGroup>
 </Project>
\ No newline at end of file

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.sln
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.sln?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.sln (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.sln Tue Jul  6 22:19:34 2010
@@ -20,11 +20,7 @@ Global
 		{E8C995C3-FF81-491B-A3B7-9D7C753BDDC3}.Release|Any CPU.ActiveCfg = Release|Any CPU
 		{E8C995C3-FF81-491B-A3B7-9D7C753BDDC3}.Release|Any CPU.Build.0 = Release|Any CPU
 	EndGlobalSection
-	GlobalSection(SolutionProperties) = preSolution
-		HideSolutionNode = FALSE
-	EndGlobalSection
 	GlobalSection(MonoDevelopProperties) = preSolution
-		version = 0.1
 		StartupItem = vs2008-stomp.csproj
 		Policies = $0
 		$0.TextStylePolicy = $1
@@ -41,4 +37,7 @@ Global
 		$3.inheritsScope = text/x-csharp
 		$3.scope = text/x-csharp
 	EndGlobalSection
+	GlobalSection(SolutionProperties) = preSolution
+		HideSolutionNode = FALSE
+	EndGlobalSection
 EndGlobal