You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/07/07 00:19:34 UTC
svn commit: r961033 - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk:
./ src/main/csharp/ src/main/csharp/Commands/ src/main/csharp/Protocol/
src/main/csharp/Transport/ src/main/csharp/Transport/Failover/
src/main/csharp/Transport/Tcp/ src/test/cs...
Author: tabish
Date: Tue Jul 6 22:19:34 2010
New Revision: 961033
URL: http://svn.apache.org/viewvc?rev=961033&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-254
Adds a basic FailoverTransport and Inactivity monitor. Also provides a simple ConnectionStateTracker that can track the Connection and restore all its Consumer subscriptions once a successful failover is completed.
Added:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/KeepAliveInfo.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/IOException.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ICompositeTransport.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs (with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructureTypes.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MapMessageTest.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.sln
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs Tue Jul 6 22:19:34 2010
@@ -16,6 +16,7 @@
*/
using System;
+using Apache.NMS.Stomp.State;
namespace Apache.NMS.Stomp.Commands
{
@@ -200,6 +201,14 @@ namespace Apache.NMS.Stomp.Commands
}
}
+ public virtual bool IsKeepAliveInfo
+ {
+ get
+ {
+ return false;
+ }
+ }
+
public virtual bool ResponseRequired
{
get
@@ -212,6 +221,11 @@ namespace Apache.NMS.Stomp.Commands
}
}
+ public virtual Response visit(ICommandVisitor visitor)
+ {
+ throw new ApplicationException("BaseCommand.Visit() not implemented");
+ }
+
public override Object Clone()
{
// Since we are a derived class use the base's Clone()
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs Tue Jul 6 22:19:34 2010
@@ -16,6 +16,7 @@
*/
using System;
+using Apache.NMS.Stomp.State;
namespace Apache.NMS.Stomp.Commands
{
@@ -81,6 +82,17 @@ namespace Apache.NMS.Stomp.Commands
get;
}
+ bool IsKeepAliveInfo
+ {
+ get;
+ }
+
+ bool IsShutdownInfo
+ {
+ get;
+ }
+
+ Response visit(ICommandVisitor visitor);
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructureTypes.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructureTypes.cs?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructureTypes.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructureTypes.cs Tue Jul 6 22:19:34 2010
@@ -49,6 +49,7 @@ namespace Apache.NMS.Stomp.Commands
public const byte RemoveInfoType = 25;
public const byte RemoveSubscriptionInfoType = 26;
public const byte ErrorResponseType = 27;
+ public const byte KeepAliveInfoType = 28;
public const byte DestinationType = 48;
public const byte TempDestinationType = 49;
@@ -128,6 +129,9 @@ namespace Apache.NMS.Stomp.Commands
case ErrorResponseType:
packetTypeStr = "ErrorResponseType";
break;
+ case KeepAliveInfoType:
+ packetTypeStr = "KeepAliveInfoType";
+ break;
case DestinationType:
packetTypeStr = "DestinationType";
break;
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/KeepAliveInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/KeepAliveInfo.cs?rev=961033&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/KeepAliveInfo.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/KeepAliveInfo.cs Tue Jul 6 22:19:34 2010
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+
+using Apache.NMS.Stomp.State;
+
+namespace Apache.NMS.Stomp.Commands
+{
+ public class KeepAliveInfo : BaseCommand
+ {
+ ///
+ /// <summery>
+ /// Get the unique identifier that this object and its own
+ /// Marshaler share.
+ /// </summery>
+ ///
+ public override byte GetDataStructureType()
+ {
+ return DataStructureTypes.KeepAliveInfoType;
+ }
+
+ ///
+ /// <summery>
+ /// Returns a string containing the information for this DataStructure
+ /// such as its type and value of its elements.
+ /// </summery>
+ ///
+ public override string ToString()
+ {
+ return GetType().Name + "[ " +
+ "commandId = " + this.CommandId + ", " +
+ "responseRequired = " + this.ResponseRequired + ", " + " ]";
+ }
+
+ ///
+ /// <summery>
+ /// Return an answer of true to the isKeepAliveInfo() query.
+ /// </summery>
+ ///
+ public override bool IsKeepAliveInfo
+ {
+ get
+ {
+ return true;
+ }
+ }
+
+ ///
+ /// <summery>
+ /// Allows a Visitor to visit this command and return a response to the
+ /// command based on the command type being visited. The command will call
+ /// the proper processXXX method in the visitor.
+ /// </summery>
+ ///
+ public override Response visit(ICommandVisitor visitor)
+ {
+ return visitor.processKeepAliveInfo( this );
+ }
+
+ };
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/KeepAliveInfo.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/KeepAliveInfo.cs
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/IOException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/IOException.cs?rev=961033&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/IOException.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/IOException.cs Tue Jul 6 22:19:34 2010
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.Stomp.Commands;
+using System.Text;
+using Apache.NMS;
+
+namespace Apache.NMS.Stomp
+{
+
+ /// <summary>
+ /// Exception thrown when an IO error occurs
+ /// </summary>
+ public class IOException : NMSException
+ {
+ public IOException()
+ : base("IO Exception failed with missing exception log")
+ {
+ }
+
+ public IOException(String msg)
+ : base(msg)
+ {
+ }
+
+ public IOException(String msg, Exception inner)
+ : base(msg, inner)
+ {
+ }
+ }
+}
+
+
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/IOException.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/IOException.cs
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs Tue Jul 6 22:19:34 2010
@@ -93,6 +93,10 @@ namespace Apache.NMS.Stomp.Protocol
{
WriteRemoveInfo((RemoveInfo) o, dataOut);
}
+ else if(o is KeepAliveInfo)
+ {
+ WriteKeepAliveInfo((KeepAliveInfo) o, dataOut);
+ }
else if(o is Command)
{
Command command = o as Command;
@@ -435,7 +439,7 @@ namespace Apache.NMS.Stomp.Protocol
}
else
{
- // frame.SetProperty("transformation", "jms-map-xml");
+ frame.SetProperty("transformation", "jms-xml");
}
frame.SetProperty("activemq.dispatchAsync", command.DispatchAsync);
@@ -465,6 +469,11 @@ namespace Apache.NMS.Stomp.Protocol
frame.ToStream(dataOut);
}
+ protected virtual void WriteKeepAliveInfo(KeepAliveInfo command, BinaryWriter dataOut)
+ {
+ dataOut.Write((byte) '\n' );
+ }
+
protected virtual void WriteRemoveInfo(RemoveInfo command, BinaryWriter dataOut)
{
StompFrame frame = new StompFrame("UNSUBSCRIBE");
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=961033&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Tue Jul 6 22:19:34 2010
@@ -0,0 +1,903 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Threading;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.State;
+using Apache.NMS.Stomp.Threads;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp.Transport.Failover
+{
+ /// <summary>
+ /// A Transport that is made reliable by being able to fail over to another
+ /// transport when a transport failure is detected.
+ /// </summary>
+ public class FailoverTransport : ICompositeTransport, IComparable
+ {
+ private static int idCounter = 0;
+ private int id;
+
+ private bool disposed;
+ private bool connected;
+ private List<Uri> uris = new List<Uri>();
+ private CommandHandler commandHandler;
+ private ExceptionHandler exceptionHandler;
+ private InterruptedHandler interruptedHandler;
+ private ResumedHandler resumedHandler;
+
+ private Mutex reconnectMutex = new Mutex();
+ private Mutex sleepMutex = new Mutex();
+ private ConnectionStateTracker stateTracker = new ConnectionStateTracker();
+ private Dictionary<int, Command> requestMap = new Dictionary<int, Command>();
+
+ private Uri connectedTransportURI;
+ private Uri failedConnectTransportURI;
+ private AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);
+ private TaskRunner reconnectTask = null;
+ private bool started;
+
+ private int timeout = -1;
+ private int initialReconnectDelay = 10;
+ private int maxReconnectDelay = 1000 * 30;
+ private int backOffMultiplier = 2;
+ private bool useExponentialBackOff = true;
+ private bool randomize = true;
+ private int maxReconnectAttempts;
+ private int connectFailures;
+ private int reconnectDelay = 10;
+ private Exception connectionFailure;
+ private bool firstConnection = true;
+ private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
+ private volatile Exception failure;
+ private readonly object mutex = new object();
+
+ public FailoverTransport()
+ {
+ id = idCounter++;
+ }
+
+ ~FailoverTransport()
+ {
+ Dispose(false);
+ }
+
+ #region FailoverTask
+
+ private class FailoverTask : Task
+ {
+ private FailoverTransport parent;
+
+ public FailoverTask(FailoverTransport p)
+ {
+ parent = p;
+ }
+
+ public bool Iterate()
+ {
+ bool result = false;
+ bool doReconnect = !parent.disposed && parent.connectionFailure == null;
+ try
+ {
+ if(parent.ConnectedTransport == null && doReconnect)
+ {
+ result = parent.DoConnect();
+ }
+ }
+ finally
+ {
+ }
+
+ return result;
+ }
+ }
+
+ #endregion
+
+ #region Property Accessors
+
+ public CommandHandler Command
+ {
+ get { return commandHandler; }
+ set { commandHandler = value; }
+ }
+
+ public ExceptionHandler Exception
+ {
+ get { return exceptionHandler; }
+ set { exceptionHandler = value; }
+ }
+
+ public InterruptedHandler Interrupted
+ {
+ get { return interruptedHandler; }
+ set { this.interruptedHandler = value; }
+ }
+
+ public ResumedHandler Resumed
+ {
+ get { return resumedHandler; }
+ set { this.resumedHandler = value; }
+ }
+
+ internal Exception Failure
+ {
+ get{ return failure; }
+ set
+ {
+ lock(mutex)
+ {
+ failure = value;
+ }
+ }
+ }
+
+ public int Timeout
+ {
+ get { return this.timeout; }
+ set { this.timeout = value; }
+ }
+
+ public TimeSpan RequestTimeout
+ {
+ get { return requestTimeout; }
+ set { requestTimeout = value; }
+ }
+
+ public int InitialReconnectDelay
+ {
+ get { return initialReconnectDelay; }
+ set { initialReconnectDelay = value; }
+ }
+
+ public int MaxReconnectDelay
+ {
+ get { return maxReconnectDelay; }
+ set { maxReconnectDelay = value; }
+ }
+
+ public int ReconnectDelay
+ {
+ get { return reconnectDelay; }
+ set { reconnectDelay = value; }
+ }
+
+ public int ReconnectDelayExponent
+ {
+ get { return backOffMultiplier; }
+ set { backOffMultiplier = value; }
+ }
+
+ public ITransport ConnectedTransport
+ {
+ get { return connectedTransport.Value; }
+ set { connectedTransport.Value = value; }
+ }
+
+ public Uri ConnectedTransportURI
+ {
+ get { return connectedTransportURI; }
+ set { connectedTransportURI = value; }
+ }
+
+ public int MaxReconnectAttempts
+ {
+ get { return maxReconnectAttempts; }
+ set { maxReconnectAttempts = value; }
+ }
+
+ public bool Randomize
+ {
+ get { return randomize; }
+ set { randomize = value; }
+ }
+
+ public bool UseExponentialBackOff
+ {
+ get { return useExponentialBackOff; }
+ set { useExponentialBackOff = value; }
+ }
+
+ #endregion
+
+ public bool IsFaultTolerant
+ {
+ get { return true; }
+ }
+
+ public bool IsDisposed
+ {
+ get { return disposed; }
+ }
+
+ public bool IsConnected
+ {
+ get { return connected; }
+ }
+
+ public bool IsStarted
+ {
+ get { return started; }
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <param name="command"></param>
+ /// <returns>Returns true if the command is one sent when a connection is being closed.</returns>
+ private bool IsShutdownCommand(Command command)
+ {
+ return (command != null && (command.IsShutdownInfo || command is RemoveInfo));
+ }
+
+ public void OnException(ITransport sender, Exception error)
+ {
+ try
+ {
+ HandleTransportFailure(error);
+ }
+ catch(Exception e)
+ {
+ e.GetType();
+ // What to do here?
+ }
+ }
+
+ public void disposedOnCommand(ITransport sender, Command c)
+ {
+ }
+
+ public void disposedOnException(ITransport sender, Exception e)
+ {
+ }
+
+ public void HandleTransportFailure(Exception e)
+ {
+ ITransport transport = connectedTransport.GetAndSet(null);
+ if(transport != null)
+ {
+ transport.Command = new CommandHandler(disposedOnCommand);
+ transport.Exception = new ExceptionHandler(disposedOnException);
+ try
+ {
+ transport.Stop();
+ }
+ catch(Exception ex)
+ {
+ ex.GetType(); // Ignore errors but this lets us see the error during debugging
+ }
+
+ lock(reconnectMutex)
+ {
+ bool reconnectOk = false;
+ if(started)
+ {
+ Tracer.WarnFormat("Transport failed to {0}, attempting to automatically reconnect due to: {1}", ConnectedTransportURI.ToString(), e.Message);
+ reconnectOk = true;
+ }
+
+ failedConnectTransportURI = ConnectedTransportURI;
+ ConnectedTransportURI = null;
+ connected = false;
+ if(reconnectOk)
+ {
+ reconnectTask.Wakeup();
+ }
+ }
+
+ if(this.Interrupted != null)
+ {
+ this.Interrupted(transport);
+ }
+ }
+ }
+
+ public void Start()
+ {
+ lock(reconnectMutex)
+ {
+ if(started)
+ {
+ Tracer.Debug("FailoverTransport Already Started.");
+ return;
+ }
+
+ Tracer.Debug("FailoverTransport Started.");
+ started = true;
+ if(ConnectedTransport != null)
+ {
+ stateTracker.DoRestore(ConnectedTransport);
+ }
+ else
+ {
+ Reconnect();
+ }
+ }
+ }
+
+ public virtual void Stop()
+ {
+ ITransport transportToStop = null;
+
+ lock(reconnectMutex)
+ {
+ if(!started)
+ {
+ Tracer.Debug("FailoverTransport Already Stopped.");
+ return;
+ }
+
+ Tracer.Debug("FailoverTransport Stopped.");
+ started = false;
+ disposed = true;
+ connected = false;
+
+ if(ConnectedTransport != null)
+ {
+ transportToStop = connectedTransport.GetAndSet(null);
+ }
+ }
+
+ try
+ {
+ sleepMutex.WaitOne();
+ }
+ finally
+ {
+ sleepMutex.ReleaseMutex();
+ }
+
+ if(reconnectTask != null)
+ {
+ reconnectTask.Shutdown();
+ }
+
+ if(transportToStop != null)
+ {
+ transportToStop.Stop();
+ }
+ }
+
+ public FutureResponse AsyncRequest(Command command)
+ {
+ throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");
+ }
+
+ public Response Request(Command command)
+ {
+ throw new ApplicationException("FailoverTransport does not implement Request(Command)");
+ }
+
+ public Response Request(Command command, TimeSpan ts)
+ {
+ throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");
+ }
+
+ public void OnCommand(ITransport sender, Command command)
+ {
+ if(command != null)
+ {
+ if(command.IsResponse)
+ {
+ Object oo = null;
+ lock(((ICollection) requestMap).SyncRoot)
+ {
+ int v = ((Response) command).CorrelationId;
+ try
+ {
+ if(requestMap.ContainsKey(v))
+ {
+ oo = requestMap[v];
+ requestMap.Remove(v);
+ }
+ }
+ catch
+ {
+ }
+ }
+
+ Tracked t = oo as Tracked;
+ if(t != null)
+ {
+ t.onResponses();
+ }
+ }
+ }
+
+ this.Command(sender, command);
+ }
+
+ public void Oneway(Command command)
+ {
+ Exception error = null;
+
+ lock(reconnectMutex)
+ {
+ if(IsShutdownCommand(command) && ConnectedTransport == null)
+ {
+ if(command.IsShutdownInfo)
+ {
+ // Skipping send of ShutdownInfo command when not connected.
+ return;
+ }
+
+ if(command is RemoveInfo)
+ {
+ // Simulate response to RemoveInfo command
+ Response response = new Response();
+ response.CorrelationId = command.CommandId;
+ OnCommand(this, response);
+ return;
+ }
+ }
+
+ // Keep trying until the message is sent.
+ for(int i = 0; !disposed; i++)
+ {
+ try
+ {
+ // Wait for transport to be connected.
+ ITransport transport = ConnectedTransport;
+ DateTime start = DateTime.Now;
+ bool timedout = false;
+ while(transport == null && !disposed && connectionFailure == null)
+ {
+ Tracer.Info("Waiting for transport to reconnect.");
+
+ int elapsed = (int)(DateTime.Now - start).TotalMilliseconds;
+ if( this.timeout > 0 && elapsed > timeout )
+ {
+ timedout = true;
+ Tracer.DebugFormat("FailoverTransport.oneway - timed out after {0} mills", elapsed );
+ break;
+ }
+
+ // Release so that the reconnect task can run
+ try
+ {
+ // Wait for something
+ Monitor.Wait(reconnectMutex, 1000);
+ }
+ catch(ThreadInterruptedException e)
+ {
+ Tracer.DebugFormat("Interrupted: {0}", e.Message);
+ }
+
+ transport = ConnectedTransport;
+ }
+
+ if(transport == null)
+ {
+ // Previous loop may have exited due to use being disposed.
+ if(disposed)
+ {
+ error = new IOException("Transport disposed.");
+ }
+ else if(connectionFailure != null)
+ {
+ error = connectionFailure;
+ }
+ else if(timedout)
+ {
+ error = new IOException("Failover oneway timed out after "+ timeout +" milliseconds.");
+ }
+ else
+ {
+ error = new IOException("Unexpected failure.");
+ }
+ break;
+ }
+
+ // If it was a request and it was not being tracked by
+ // the state tracker, then hold it in the requestMap so
+ // that we can replay it later.
+ Tracked tracked = stateTracker.track(command);
+ lock(((ICollection) requestMap).SyncRoot)
+ {
+ if(tracked != null && tracked.WaitingForResponse)
+ {
+ requestMap.Add(command.CommandId, tracked);
+ }
+ else if(tracked == null && command.ResponseRequired)
+ {
+ requestMap.Add(command.CommandId, command);
+ }
+ }
+
+ // Send the message.
+ try
+ {
+ transport.Oneway(command);
+ stateTracker.trackBack(command);
+ }
+ catch(Exception e)
+ {
+ // If the command was not tracked.. we will retry in
+ // this method
+ if(tracked == null)
+ {
+
+ // since we will retry in this method.. take it
+ // out of the request map so that it is not
+ // sent 2 times on recovery
+ if(command.ResponseRequired)
+ {
+ lock(((ICollection) requestMap).SyncRoot)
+ {
+ requestMap.Remove(command.CommandId);
+ }
+ }
+
+ // Rethrow the exception so it will handled by
+ // the outer catch
+ throw e;
+ }
+
+ }
+
+ return;
+
+ }
+ catch(Exception e)
+ {
+ Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}", i, e.Message);
+ Tracer.DebugFormat("Failed Message Was: {0}", command);
+ HandleTransportFailure(e);
+ }
+ }
+ }
+
+ if(!disposed)
+ {
+ if(error != null)
+ {
+ throw error;
+ }
+ }
+ }
+
+ public void Add(Uri[] u)
+ {
+ lock(uris)
+ {
+ for(int i = 0; i < u.Length; i++)
+ {
+ if(!uris.Contains(u[i]))
+ {
+ uris.Add(u[i]);
+ }
+ }
+ }
+
+ Reconnect();
+ }
+
+ public void Remove(Uri[] u)
+ {
+ lock(uris)
+ {
+ for(int i = 0; i < u.Length; i++)
+ {
+ uris.Remove(u[i]);
+ }
+ }
+
+ Reconnect();
+ }
+
+ public void Add(String u)
+ {
+ try
+ {
+ Uri uri = new Uri(u);
+ lock(uris)
+ {
+ if(!uris.Contains(uri))
+ {
+ uris.Add(uri);
+ }
+ }
+
+ Reconnect();
+ }
+ catch(Exception e)
+ {
+ Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
+ }
+ }
+
+ public void Reconnect(Uri uri)
+ {
+ Add(new Uri[] { uri });
+ }
+
+ public void Reconnect()
+ {
+ lock(reconnectMutex)
+ {
+ if(started)
+ {
+ if(reconnectTask == null)
+ {
+ Tracer.Debug("Creating reconnect task");
+ reconnectTask = new DedicatedTaskRunner(new FailoverTask(this));
+ }
+
+ Tracer.Debug("Waking up reconnect task");
+ try
+ {
+ reconnectTask.Wakeup();
+ }
+ catch(ThreadInterruptedException)
+ {
+ }
+ }
+ else
+ {
+ Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
+ }
+ }
+ }
+
+ private List<Uri> ConnectList
+ {
+ get
+ {
+ List<Uri> l = new List<Uri>(uris);
+ bool removed = false;
+ if(failedConnectTransportURI != null)
+ {
+ removed = l.Remove(failedConnectTransportURI);
+ }
+
+ if(Randomize)
+ {
+ // Randomly, reorder the list by random swapping
+ Random r = new Random(DateTime.Now.Millisecond);
+ for(int i = 0; i < l.Count; i++)
+ {
+ int p = r.Next(l.Count);
+ Uri t = l[p];
+ l[p] = l[i];
+ l[i] = t;
+ }
+ }
+
+ if(removed)
+ {
+ l.Add(failedConnectTransportURI);
+ }
+
+ return l;
+ }
+ }
+
+ protected void RestoreTransport(ITransport t)
+ {
+ Tracer.Info("Restoring previous transport connection.");
+ t.Start();
+
+ stateTracker.DoRestore(t);
+
+ Tracer.Info("Sending queued commands...");
+ Dictionary<int, Command> tmpMap = null;
+ lock(((ICollection) requestMap).SyncRoot)
+ {
+ tmpMap = new Dictionary<int, Command>(requestMap);
+ }
+
+ foreach(Command command in tmpMap.Values)
+ {
+ t.Oneway(command);
+ }
+ }
+
+ public Uri RemoteAddress
+ {
+ get
+ {
+ if(ConnectedTransport != null)
+ {
+ return ConnectedTransport.RemoteAddress;
+ }
+ return null;
+ }
+ }
+
+ public Object Narrow(Type type)
+ {
+ if(this.GetType().Equals(type))
+ {
+ return this;
+ }
+ else if(ConnectedTransport != null)
+ {
+ return ConnectedTransport.Narrow(type);
+ }
+
+ return null;
+ }
+
+ private bool DoConnect()
+ {
+ lock(reconnectMutex)
+ {
+ if(ConnectedTransport != null || disposed || connectionFailure != null)
+ {
+ return false;
+ }
+ else
+ {
+ List<Uri> connectList = ConnectList;
+ if(connectList.Count == 0)
+ {
+ Failure = new NMSConnectionException("No URIs available for connection.");
+ }
+ else
+ {
+ if(!UseExponentialBackOff)
+ {
+ ReconnectDelay = InitialReconnectDelay;
+ }
+
+ ITransport transport = null;
+ Uri chosenUri = null;
+
+ try
+ {
+ foreach(Uri uri in connectList)
+ {
+ if(ConnectedTransport != null || disposed)
+ {
+ break;
+ }
+
+ Tracer.DebugFormat("Attempting connect to: {0}", uri);
+
+ // synchronous connect
+ try
+ {
+ Tracer.DebugFormat("Attempting connect to: {0}", uri.ToString());
+ transport = TransportFactory.CompositeConnect(uri);
+ chosenUri = transport.RemoteAddress;
+ break;
+ }
+ catch(Exception e)
+ {
+ Failure = e;
+ Tracer.DebugFormat("Connect fail to: {0}, reason: {1}", uri, e.Message);
+ }
+ }
+
+ if(transport != null)
+ {
+ transport.Command = new CommandHandler(OnCommand);
+ transport.Exception = new ExceptionHandler(OnException);
+ transport.Start();
+
+ if(started)
+ {
+ RestoreTransport(transport);
+ }
+
+ if(this.Resumed != null)
+ {
+ this.Resumed(transport);
+ }
+
+ Tracer.Debug("Connection established");
+ ReconnectDelay = InitialReconnectDelay;
+ ConnectedTransportURI = chosenUri;
+ ConnectedTransport = transport;
+ connectFailures = 0;
+ connected = true;
+
+ if(firstConnection)
+ {
+ firstConnection = false;
+ Tracer.InfoFormat("Successfully connected to: {0}", chosenUri.ToString());
+ }
+ else
+ {
+ Tracer.InfoFormat("Successfully reconnected to: {0}", chosenUri.ToString());
+ }
+
+ return false;
+ }
+ }
+ catch(Exception e)
+ {
+ Failure = e;
+ Tracer.DebugFormat("Connect attempt failed. Reason: {0}", e.Message);
+ }
+ }
+
+ if(MaxReconnectAttempts > 0 && ++connectFailures >= MaxReconnectAttempts)
+ {
+ Tracer.ErrorFormat("Failed to connect to transport after {0} attempt(s)", connectFailures);
+ connectionFailure = Failure;
+ this.Exception(this, connectionFailure);
+ return false;
+ }
+ }
+ }
+
+ if(!disposed)
+ {
+ Tracer.DebugFormat("Waiting {0}ms before attempting connection.", ReconnectDelay);
+ lock(sleepMutex)
+ {
+ try
+ {
+ Thread.Sleep(ReconnectDelay);
+ }
+ catch(ThreadInterruptedException)
+ {
+ }
+ }
+
+ if(UseExponentialBackOff)
+ {
+ // Exponential increment of reconnect delay.
+ ReconnectDelay *= ReconnectDelayExponent;
+ if(ReconnectDelay > MaxReconnectDelay)
+ {
+ ReconnectDelay = MaxReconnectDelay;
+ }
+ }
+ }
+ return !disposed;
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ public void Dispose(bool disposing)
+ {
+ if(disposing)
+ {
+ // get rid of unmanaged stuff
+ }
+
+ disposed = true;
+ }
+
+ public int CompareTo(Object o)
+ {
+ if(o is FailoverTransport)
+ {
+ FailoverTransport oo = o as FailoverTransport;
+
+ return this.id - oo.id;
+ }
+ else
+ {
+ throw new ArgumentException();
+ }
+ }
+
+ public override String ToString()
+ {
+ return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString();
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs?rev=961033&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs Tue Jul 6 22:19:34 2010
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp.Transport.Failover
+{
+ public class FailoverTransportFactory : ITransportFactory
+ {
+ private ITransport doConnect(Uri location)
+ {
+ ITransport transport = CreateTransport(URISupport.parseComposite(location));
+ transport = new MutexTransport(transport);
+ transport = new ResponseCorrelator(transport);
+ return transport;
+ }
+
+ public ITransport CompositeConnect(Uri location)
+ {
+ return CreateTransport(URISupport.parseComposite(location));
+ }
+
+ public ITransport CompositeConnect(Uri location, SetTransport setTransport)
+ {
+ throw new NMSConnectionException("Asynchronous composite connection not supported with Failover transport.");
+ }
+
+ public ITransport CreateTransport(Uri location)
+ {
+ return doConnect(location);
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <param name="compositData"></param>
+ /// <returns></returns>
+ public ITransport CreateTransport(URISupport.CompositeData compositData)
+ {
+ StringDictionary options = compositData.Parameters;
+ FailoverTransport transport = CreateTransport(options);
+ transport.Add(compositData.Components);
+ return transport;
+ }
+
+ public FailoverTransport CreateTransport(StringDictionary parameters)
+ {
+ FailoverTransport transport = new FailoverTransport();
+ URISupport.SetProperties(transport, parameters, "");
+ return transport;
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ICompositeTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ICompositeTransport.cs?rev=961033&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ICompositeTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ICompositeTransport.cs Tue Jul 6 22:19:34 2010
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Stomp.Transport
+{
+ public interface ICompositeTransport : ITransport
+ {
+ void Add(Uri[] uris);
+ void Remove(Uri[] uris);
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ICompositeTransport.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ICompositeTransport.cs
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs?rev=961033&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs Tue Jul 6 22:19:34 2010
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Threads;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp.Transport
+{
+ /// <summary>
+ /// This class make sure that the connection is still alive,
+ /// by monitoring the reception of commands from the peer of
+ /// the transport.
+ /// </summary>
+ public class InactivityMonitor : TransportFilter
+ {
+ private Atomic<bool> monitorStarted = new Atomic<bool>(false);
+
+ private Atomic<bool> commandSent = new Atomic<bool>(false);
+ private Atomic<bool> commandReceived = new Atomic<bool>(false);
+
+ private Atomic<bool> failed = new Atomic<bool>(false);
+ private Atomic<bool> inRead = new Atomic<bool>(false);
+ private Atomic<bool> inWrite = new Atomic<bool>(false);
+
+ private DedicatedTaskRunner asyncTask;
+ private AsyncWriteTask asyncWriteTask;
+
+ private readonly Mutex monitor = new Mutex();
+
+ private Timer connectionCheckTimer;
+
+ private long maxInactivityDuration = 10000;
+ public long MaxInactivityDuration
+ {
+ get { return this.maxInactivityDuration; }
+ set { this.maxInactivityDuration = value; }
+ }
+
+ private long maxInactivityDurationInitialDelay = 10000;
+ public long MaxInactivityDurationInitialDelay
+ {
+ get { return this.maxInactivityDurationInitialDelay; }
+ set { this.maxInactivityDurationInitialDelay = value; }
+ }
+
+ /// <summary>
+ /// Constructor or the Inactivity Monitor
+ /// </summary>
+ /// <param name="next"></param>
+ public InactivityMonitor(ITransport next)
+ : base(next)
+ {
+ Tracer.Debug("Creating Inactivity Monitor");
+ }
+
+ ~InactivityMonitor()
+ {
+ Dispose(false);
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if(disposing)
+ {
+ // get rid of unmanaged stuff
+ }
+
+ StopMonitorThreads();
+
+ base.Dispose(disposing);
+ }
+
+ #region WriteCheck Related
+ /// <summary>
+ /// Check the write to the broker
+ /// </summary>
+ public void WriteCheck(object unused)
+ {
+ if(this.inWrite.Value || this.failed.Value)
+ {
+ Tracer.Debug("Inactivity Monitor is in write or already failed.");
+ return;
+ }
+
+ if(!commandSent.Value)
+ {
+ Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
+ this.asyncTask.Wakeup();
+ }
+ else
+ {
+ Tracer.Debug("Message sent since last write check. Resetting flag");
+ }
+
+ commandSent.Value = false;
+ }
+ #endregion
+
+ public override void Stop()
+ {
+ StopMonitorThreads();
+ next.Stop();
+ }
+
+ protected override void OnCommand(ITransport sender, Command command)
+ {
+ commandReceived.Value = true;
+ inRead.Value = true;
+ try
+ {
+ try
+ {
+ StartMonitorThreads();
+ }
+ catch(IOException ex)
+ {
+ OnException(this, ex);
+ }
+
+ base.OnCommand(sender, command);
+ }
+ finally
+ {
+ inRead.Value = false;
+ }
+ }
+
+ public override void Oneway(Command command)
+ {
+ // Disable inactivity monitoring while processing a command.
+ // synchronize this method - its not synchronized
+ // further down the transport stack and gets called by more
+ // than one thread by this class
+ lock(inWrite)
+ {
+ inWrite.Value = true;
+ try
+ {
+ if(failed.Value)
+ {
+ throw new IOException("Channel was inactive for too long: " + next.RemoteAddress.ToString());
+ }
+
+ next.Oneway(command);
+ }
+ finally
+ {
+ commandSent.Value = true;
+ inWrite.Value = false;
+ }
+ }
+ }
+
+ protected override void OnException(ITransport sender, Exception command)
+ {
+ if(failed.CompareAndSet(false, true))
+ {
+ Tracer.Debug("Exception received in the Inactivity Monitor: " + command.ToString());
+ StopMonitorThreads();
+ base.OnException(sender, command);
+ }
+ }
+
+ private void StartMonitorThreads()
+ {
+ lock(monitor)
+ {
+ if(monitorStarted.Value || maxInactivityDuration == 0)
+ {
+ return;
+ }
+
+ Tracer.DebugFormat("Inactivity: Write Check time interval: {0}", maxInactivityDuration );
+ Tracer.DebugFormat("Inactivity: Initial Delay time interval: {0}", maxInactivityDurationInitialDelay );
+
+ this.asyncWriteTask = new AsyncWriteTask(this);
+ this.asyncTask = new DedicatedTaskRunner(this.asyncWriteTask);
+
+ monitorStarted.Value = true;
+
+ this.connectionCheckTimer = new Timer(
+ new TimerCallback(WriteCheck),
+ null,
+ maxInactivityDurationInitialDelay,
+ maxInactivityDuration
+ );
+ }
+ }
+
+ private void StopMonitorThreads()
+ {
+ lock(monitor)
+ {
+ if(monitorStarted.CompareAndSet(true, false))
+ {
+ AutoResetEvent shutdownEvent = new AutoResetEvent(false);
+
+ // Attempt to wait for the Timer to shutdown, but don't wait
+ // forever, if they don't shutdown after two seconds, just quit.
+ this.connectionCheckTimer.Dispose(shutdownEvent);
+ shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000), false);
+
+ this.asyncTask.Shutdown();
+ this.asyncTask = null;
+ this.asyncWriteTask = null;
+ }
+ }
+ }
+
+ #region Async Tasks
+
+ // Task that fires when the TaskRunner is signaled by the WriteCheck Timer Task.
+ class AsyncWriteTask : Task
+ {
+ private InactivityMonitor parent;
+
+ public AsyncWriteTask(InactivityMonitor parent)
+ {
+ this.parent = parent;
+ }
+
+ public bool Iterate()
+ {
+ Tracer.Debug("AsyncWriteTask perparing for another Write Check");
+ if(this.parent.monitorStarted.Value)
+ {
+ try
+ {
+ Tracer.Debug("AsyncWriteTask Write Check required sending KeepAlive.");
+ KeepAliveInfo info = new KeepAliveInfo();
+ this.parent.Oneway(info);
+ }
+ catch(IOException e)
+ {
+ this.parent.OnException(parent, e);
+ }
+ }
+
+ return false;
+ }
+ }
+ #endregion
+ }
+
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Tue Jul 6 22:19:34 2010
@@ -39,6 +39,16 @@ namespace Apache.NMS.Stomp.Transport.Tcp
set { useLogging = value; }
}
+ /// <summary>
+ /// Should the Inactivity Monitor be enabled on this Transport.
+ /// </summary>
+ private bool useInactivityMonitor = true;
+ public bool UseInactivityMonitor
+ {
+ get { return this.useInactivityMonitor; }
+ set { this.useInactivityMonitor = value; }
+ }
+
/// <summary>
/// Size in bytes of the receive buffer.
/// </summary>
@@ -128,6 +138,11 @@ namespace Apache.NMS.Stomp.Transport.Tcp
transport = new LoggingTransport(transport);
}
+ if(UseInactivityMonitor)
+ {
+ transport = new InactivityMonitor(transport);
+ }
+
transport.RequestTimeout = this.requestTimeout;
if(setTransport != null)
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs Tue Jul 6 22:19:34 2010
@@ -18,6 +18,7 @@
using System;
using Apache.NMS.Stomp.Transport.Tcp;
+using Apache.NMS.Stomp.Transport.Failover;
namespace Apache.NMS.Stomp.Transport
{
@@ -77,6 +78,9 @@ namespace Apache.NMS.Stomp.Transport
{
switch(scheme.ToLower())
{
+ case "failover":
+ factory = new FailoverTransportFactory();
+ break;
case "tcp":
factory = new TcpTransportFactory();
break;
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MapMessageTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MapMessageTest.cs?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MapMessageTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MapMessageTest.cs Tue Jul 6 22:19:34 2010
@@ -56,7 +56,8 @@ namespace Apache.NMS.Stomp.Test
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
- string TOPIC = DESTINATION_NAME + "?consumer.transformation=jms-map-xml";
+ string TOPIC = DESTINATION_NAME + "?consumer.transformation=jms-xml";
+// string TOPIC = DESTINATION_NAME;
IDestination destination = session.GetTopic(TOPIC);
using(IMessageConsumer consumer = session.CreateConsumer(destination))
using(IMessageProducer producer = session.CreateProducer(destination))
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj Tue Jul 6 22:19:34 2010
@@ -60,14 +60,15 @@
<Reference Include="nunit.framework.extensions, Version=2.4.8.0, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>lib\NUnit\net-2.0\nunit.framework.extensions.dll</HintPath>
+ <Package>mono-nunit</Package>
</Reference>
<Reference Include="System" />
<Reference Include="System.Xml" />
- <Reference Include="Apache.NMS, Version=1.3.0.1946, Culture=neutral, PublicKeyToken=82756feee3957618">
+ <Reference Include="Apache.NMS, Version=1.4.0.2007, Culture=neutral, PublicKeyToken=82756feee3957618">
<SpecificVersion>False</SpecificVersion>
<HintPath>build\mono-2.0\debug\Apache.NMS.dll</HintPath>
</Reference>
- <Reference Include="Apache.NMS.Stomp, Version=1.3.0.1949, Culture=neutral, PublicKeyToken=82756feee3957618">
+ <Reference Include="Apache.NMS.Stomp, Version=1.4.0.2010, Culture=neutral, PublicKeyToken=82756feee3957618">
<SpecificVersion>False</SpecificVersion>
<HintPath>build\mono-2.0\debug\Apache.NMS.Stomp.dll</HintPath>
</Reference>
@@ -97,7 +98,7 @@
<Properties>
<Policies>
<TextStylePolicy FileWidth="120" RemoveTrailingWhitespace="True" inheritsSet="VisualStudio" inheritsScope="text/plain" />
- <StandardHeader Text="/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
" inheritsSet="MITX11License" />
+ <StandardHeader Text="/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
" inheritsSet="Apache2License" />
</Policies>
</Properties>
</MonoDevelop>
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj Tue Jul 6 22:19:34 2010
@@ -55,7 +55,7 @@
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.Xml" />
- <Reference Include="Apache.NMS, Version=1.3.0.1893, Culture=neutral, PublicKeyToken=82756feee3957618">
+ <Reference Include="Apache.NMS, Version=1.4.0.2007, Culture=neutral, PublicKeyToken=82756feee3957618">
<SpecificVersion>False</SpecificVersion>
<HintPath>build\mono-2.0\debug\Apache.NMS.dll</HintPath>
</Reference>
@@ -146,6 +146,20 @@
<Compile Include="src\main\csharp\Protocol\XmlPrimitiveMapMarshaler.cs" />
<Compile Include="src\main\csharp\Transport\Tcp\SslTransport.cs" />
<Compile Include="src\main\csharp\Transport\Tcp\SslTransportFactory.cs" />
+ <Compile Include="src\main\csharp\Transport\Failover\FailoverTransport.cs" />
+ <Compile Include="src\main\csharp\Transport\Failover\FailoverTransportFactory.cs" />
+ <Compile Include="src\main\csharp\Transport\InactivityMonitor.cs" />
+ <Compile Include="src\main\csharp\Commands\KeepAliveInfo.cs" />
+ <Compile Include="src\main\csharp\State\CommandVisitorAdapter.cs" />
+ <Compile Include="src\main\csharp\State\ConnectionState.cs" />
+ <Compile Include="src\main\csharp\State\ConnectionStateTracker.cs" />
+ <Compile Include="src\main\csharp\State\ConsumerState.cs" />
+ <Compile Include="src\main\csharp\State\ICommandVisitor.cs" />
+ <Compile Include="src\main\csharp\State\ThreadSimulator.cs" />
+ <Compile Include="src\main\csharp\State\Tracked.cs" />
+ <Compile Include="src\main\csharp\Transport\ICompositeTransport.cs" />
+ <Compile Include="src\main\csharp\State\SynchronizedObjects.cs" />
+ <Compile Include="src\main\csharp\IOException.cs" />
</ItemGroup>
<ItemGroup>
<None Include="keyfile\NMSKey.snk" />
@@ -157,10 +171,13 @@
<Properties>
<Policies>
<TextStylePolicy FileWidth="120" RemoveTrailingWhitespace="True" inheritsSet="VisualStudio" inheritsScope="text/plain" />
- <StandardHeader Text="/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
" inheritsSet="MITX11License" />
+ <StandardHeader Text="/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
" inheritsSet="Apache2License" />
</Policies>
</Properties>
</MonoDevelop>
<VisualStudio />
</ProjectExtensions>
+ <ItemGroup>
+ <Folder Include="src\main\csharp\State\" />
+ </ItemGroup>
</Project>
\ No newline at end of file
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.sln
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.sln?rev=961033&r1=961032&r2=961033&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.sln (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.sln Tue Jul 6 22:19:34 2010
@@ -20,11 +20,7 @@ Global
{E8C995C3-FF81-491B-A3B7-9D7C753BDDC3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E8C995C3-FF81-491B-A3B7-9D7C753BDDC3}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
- GlobalSection(SolutionProperties) = preSolution
- HideSolutionNode = FALSE
- EndGlobalSection
GlobalSection(MonoDevelopProperties) = preSolution
- version = 0.1
StartupItem = vs2008-stomp.csproj
Policies = $0
$0.TextStylePolicy = $1
@@ -41,4 +37,7 @@ Global
$3.inheritsScope = text/x-csharp
$3.scope = text/x-csharp
EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
EndGlobal