You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2013/12/17 16:34:55 UTC
svn commit: r1551577 [2/2] -
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=1551577&r1=1551576&r2=1551577&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Tue Dec 17 15:34:55 2013
@@ -1,1679 +1,1689 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Collections;
-using System.Collections.Generic;
-using System.Threading;
-using System.Text;
-using System.Net;
-using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.ActiveMQ.State;
-using Apache.NMS.ActiveMQ.Threads;
-using Apache.NMS.Util;
-
-namespace Apache.NMS.ActiveMQ.Transport.Failover
-{
- /// <summary>
- /// A Transport that is made reliable by being able to fail over to another
- /// transport when a transport failure is detected.
- /// </summary>
- public class FailoverTransport : ICompositeTransport, IComparable
- {
- private static int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
- private static int INFINITE = -1;
-
- private static int idCounter = 0;
- private readonly int id;
-
- private bool disposed;
- private bool connected;
- private readonly List<Uri> uris = new List<Uri>();
- private readonly List<Uri> updated = new List<Uri>();
-
- private CommandHandler commandHandler;
- private ExceptionHandler exceptionHandler;
- private InterruptedHandler interruptedHandler;
- private ResumedHandler resumedHandler;
-
- private readonly CountDownLatch listenerLatch = new CountDownLatch(4);
- private readonly Mutex reconnectMutex = new Mutex();
- private readonly Mutex backupMutex = new Mutex();
- private readonly Mutex sleepMutex = new Mutex();
- private readonly ConnectionStateTracker stateTracker = new ConnectionStateTracker();
- private readonly Dictionary<int, Command> requestMap = new Dictionary<int, Command>();
-
- private Uri connectedTransportURI;
- private Uri failedConnectTransportURI;
- private readonly AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);
- private TaskRunner reconnectTask = null;
- private bool started;
- private bool initialized;
- private int initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
- private int maxReconnectDelay = 1000 * 30;
- private int backOffMultiplier = 2;
- private int timeout = INFINITE;
- private bool useExponentialBackOff = true;
- private bool randomize = true;
- private int maxReconnectAttempts = INFINITE;
- private int startupMaxReconnectAttempts = INFINITE;
- private int connectFailures;
- private int reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
- private Exception connectionFailure;
- private bool firstConnection = true;
- private bool backup = false;
- private readonly List<BackupTransport> backups = new List<BackupTransport>();
- private int backupPoolSize = 1;
- private bool trackMessages = false;
- private bool trackTransactionProducers = true;
- private int maxCacheSize = 256;
- private volatile Exception failure;
- private readonly object mutex = new object();
- private bool reconnectSupported = true;
- private bool updateURIsSupported = true;
- private bool doRebalance = false;
- private bool connectedToPriority = false;
- private bool priorityBackup = false;
- private List<Uri> priorityList = new List<Uri>();
- private bool priorityBackupAvailable = false;
-
- // Not Sure how to work these back in with all the changes.
- //private int asyncTimeout = 45000;
- //private bool asyncConnect = false;
-
- public FailoverTransport()
- {
- id = idCounter++;
-
- stateTracker.TrackTransactions = true;
- reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(
- new FailoverTask(this), "ActiveMQ Failover Worker: " + this.GetHashCode().ToString());
- }
-
- ~FailoverTransport()
- {
- Dispose(false);
- }
-
- #region FailoverTask
-
- private class FailoverTask : Task
- {
- private readonly FailoverTransport parent;
-
- public FailoverTask(FailoverTransport p)
- {
- parent = p;
- }
-
- public bool Iterate()
- {
- bool result = false;
- if (!parent.IsStarted)
- {
- return false;
- }
-
- bool buildBackup = true;
- lock (parent.backupMutex)
- {
- if ((parent.connectedTransport.Value == null || parent.doRebalance || parent.priorityBackupAvailable) && !parent.disposed)
- {
- result = parent.DoConnect();
- buildBackup = false;
- }
- }
- if (buildBackup)
- {
- parent.BuildBackups();
- if (parent.priorityBackup && !parent.connectedToPriority)
- {
- try
- {
- parent.DoDelay();
- if (parent.reconnectTask == null)
- {
- return true;
- }
- parent.reconnectTask.Wakeup();
- }
- catch (ThreadInterruptedException)
- {
- Tracer.Debug("Reconnect task has been interrupted.");
- }
- }
- }
- else
- {
- try
- {
- if (parent.reconnectTask == null)
- {
- return true;
- }
- parent.reconnectTask.Wakeup();
- }
- catch (ThreadInterruptedException)
- {
- Tracer.Debug("Reconnect task has been interrupted.");
- }
- }
- return result;
- }
- }
-
- #endregion
-
- #region Property Accessors
-
- public CommandHandler Command
- {
- get { return commandHandler; }
- set
- {
- commandHandler = value;
- listenerLatch.countDown();
- }
- }
-
- public ExceptionHandler Exception
- {
- get { return exceptionHandler; }
- set
- {
- exceptionHandler = value;
- listenerLatch.countDown();
- }
- }
-
- public InterruptedHandler Interrupted
- {
- get { return interruptedHandler; }
- set
- {
- this.interruptedHandler = value;
- this.listenerLatch.countDown();
- }
- }
-
- public ResumedHandler Resumed
- {
- get { return resumedHandler; }
- set
- {
- this.resumedHandler = value;
- this.listenerLatch.countDown();
- }
- }
-
- internal Exception Failure
- {
- get { return failure; }
- set
- {
- lock(mutex)
- {
- failure = value;
- }
- }
- }
-
- public int Timeout
- {
- get { return this.timeout; }
- set { this.timeout = value; }
- }
-
- public int InitialReconnectDelay
- {
- get { return initialReconnectDelay; }
- set { initialReconnectDelay = value; }
- }
-
- public int MaxReconnectDelay
- {
- get { return maxReconnectDelay; }
- set { maxReconnectDelay = value; }
- }
-
- public int ReconnectDelay
- {
- get { return reconnectDelay; }
- set { reconnectDelay = value; }
- }
-
- public int ReconnectDelayExponent
- {
- get { return backOffMultiplier; }
- set { backOffMultiplier = value; }
- }
-
- public ITransport ConnectedTransport
- {
- get { return connectedTransport.Value; }
- set { connectedTransport.Value = value; }
- }
-
- public Uri ConnectedTransportURI
- {
- get { return connectedTransportURI; }
- set { connectedTransportURI = value; }
- }
-
- public int MaxReconnectAttempts
- {
- get { return maxReconnectAttempts; }
- set { maxReconnectAttempts = value; }
- }
-
- public int StartupMaxReconnectAttempts
- {
- get { return startupMaxReconnectAttempts; }
- set { startupMaxReconnectAttempts = value; }
- }
-
- public bool Randomize
- {
- get { return randomize; }
- set { randomize = value; }
- }
-
- public bool Backup
- {
- get { return backup; }
- set { backup = value; }
- }
-
- public bool PriorityBackup
- {
- get { return priorityBackup; }
- set { this.priorityBackup = value; }
- }
-
- public String PriorityURIs
- {
- get { return PrintableUriList(priorityList); }
- set { this.ProcessDelimitedUriList(value, priorityList); }
- }
-
- public int BackupPoolSize
- {
- get { return backupPoolSize; }
- set { backupPoolSize = value; }
- }
-
- public bool TrackMessages
- {
- get { return trackMessages; }
- set { trackMessages = value; }
- }
-
- public bool TrackTransactionProducers
- {
- get { return trackTransactionProducers; }
- set { this.trackTransactionProducers = value; }
- }
-
- public int MaxCacheSize
- {
- get { return maxCacheSize; }
- set { maxCacheSize = value; }
- }
-
- public bool UseExponentialBackOff
- {
- get { return useExponentialBackOff; }
- set { useExponentialBackOff = value; }
- }
-
- public IWireFormat WireFormat
- {
- get
- {
- ITransport transport = ConnectedTransport;
- if(transport != null)
- {
- return transport.WireFormat;
- }
-
- return null;
- }
- }
-
- /// <summary>
- /// Gets or sets a value indicating whether to asynchronously connect to sockets
- /// </summary>
- /// <value><c>true</c> if [async connect]; otherwise, <c>false</c>.</value>
- public bool AsyncConnect
- {
- set { }
- }
-
- /// <summary>
- /// If doing an asynchronous connect, the milliseconds before timing out if no connection can be made
- /// </summary>
- /// <value>The async timeout.</value>
- public int AsyncTimeout
- {
- get { return 0; }
- set { }
- }
-
- public ConnectionStateTracker StateTracker
- {
- get { return this.stateTracker; }
- }
-
- #endregion
-
- public bool IsFaultTolerant
- {
- get { return true; }
- }
-
- public bool IsDisposed
- {
- get { return disposed; }
- }
-
- public bool IsConnected
- {
- get { return connected; }
- }
-
- public bool IsConnectedToPriority
- {
- get { return connectedToPriority; }
- }
-
- public bool IsStarted
- {
- get { return started; }
- }
-
- public bool IsReconnectSupported
- {
- get { return this.reconnectSupported; }
- }
-
- public bool IsUpdateURIsSupported
- {
- get { return this.updateURIsSupported; }
- }
-
- public void OnException(ITransport sender, Exception error)
- {
- try
- {
- HandleTransportFailure(error);
- }
- catch(Exception)
- {
- this.Exception(this, new IOException("Unexpected Transport Failure."));
- }
- }
-
- public void DisposedOnCommand(ITransport sender, Command c)
- {
- }
-
- public void DisposedOnException(ITransport sender, Exception e)
- {
- }
-
- public void HandleTransportFailure(Exception e)
- {
- ITransport transport = connectedTransport.GetAndSet(null);
- if (transport == null)
- {
- // sync with possible in progress reconnect
- lock(reconnectMutex)
- {
- transport = connectedTransport.GetAndSet(null);
- }
- }
-
- if(transport != null)
- {
- DisposeTransport(transport);
-
- bool reconnectOk = false;
- lock(reconnectMutex)
- {
- if (CanReconnect())
- {
- Tracer.WarnFormat("Transport failed to {0}, attempting to automatically reconnect due to: {1}",
- ConnectedTransportURI, e.Message);
- reconnectOk = true;
- }
-
- initialized = false;
- failedConnectTransportURI = ConnectedTransportURI;
- ConnectedTransportURI = null;
- connectedToPriority = false;
- connected = false;
-
- if (reconnectOk)
- {
- if(this.Interrupted != null)
- {
- this.Interrupted(transport);
- }
-
- updated.Remove(failedConnectTransportURI);
- reconnectTask.Wakeup();
- }
- else if (!disposed)
- {
- PropagateFailureToExceptionListener(e);
- }
- }
- }
- }
-
- private bool CanReconnect()
- {
- return started && 0 != CalculateReconnectAttemptLimit();
- }
-
- public void Start()
- {
- lock(reconnectMutex)
- {
- if(started)
- {
- Tracer.Debug("FailoverTransport Already Started.");
- return;
- }
-
- Tracer.Debug("FailoverTransport Started.");
- started = true;
- stateTracker.MaxCacheSize = MaxCacheSize;
- stateTracker.TrackMessages = TrackMessages;
- stateTracker.TrackTransactionProducers = TrackTransactionProducers;
- if(ConnectedTransport != null)
- {
- Tracer.Debug("FailoverTransport already connected, start is restoring.");
- stateTracker.DoRestore(ConnectedTransport);
- }
- else
- {
- Tracer.Debug("FailoverTransport not connected, start is reconnecting.");
- Reconnect(false);
- }
- }
- }
-
- public virtual void Stop()
- {
- ITransport transportToStop = null;
- List<ITransport> backupsToStop = new List<ITransport>(backups.Count);
-
- try
- {
- lock(reconnectMutex)
- {
- if(!started)
- {
- Tracer.Debug("FailoverTransport Already Stopped.");
- return;
- }
-
- Tracer.Debug("FailoverTransport Stopped.");
- started = false;
- disposed = true;
- connected = false;
- if(ConnectedTransport != null)
- {
- transportToStop = connectedTransport.GetAndSet(null);
- }
- }
- lock(sleepMutex)
- {
- Monitor.PulseAll(sleepMutex);
- }
- }
- finally
- {
- if(reconnectTask != null)
- {
- reconnectTask.Shutdown();
- }
- }
-
- lock(backupMutex)
- {
- foreach (BackupTransport backup in backups)
- {
- backup.Disposed = true;
- ITransport transport = backup.Transport;
- if (transport != null)
- {
- transport.Command = DisposedOnCommand;
- transport.Exception = DisposedOnException;
- backupsToStop.Add(transport);
- }
- }
- backups.Clear();
- }
-
- foreach (ITransport transport in backupsToStop)
- {
- try
- {
- if (Tracer.IsDebugEnabled)
- {
- Tracer.Debug("Stopped backup: " + transport);
- }
- DisposeTransport(transport);
- }
- catch (Exception)
- {
- }
- }
-
- if(transportToStop != null)
- {
- transportToStop.Stop();
- }
- }
-
- public FutureResponse AsyncRequest(Command command)
- {
- throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");
- }
-
- public Response Request(Command command)
- {
- throw new ApplicationException("FailoverTransport does not implement Request(Command)");
- }
-
- public Response Request(Command command, TimeSpan ts)
- {
- throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");
- }
-
- public void OnCommand(ITransport sender, Command command)
- {
- if(command != null)
- {
- if(command.IsResponse)
- {
- Command request = null;
- lock(((ICollection) requestMap).SyncRoot)
- {
- int v = ((Response) command).CorrelationId;
- try
- {
- if(requestMap.TryGetValue(v, out request))
- {
- requestMap.Remove(v);
- }
- }
- catch
- {
- }
- }
-
- Tracked tracked = request as Tracked;
- if(tracked != null)
- {
- tracked.OnResponse();
- }
- }
-
- if(!initialized)
- {
- initialized = true;
- }
-
- if(command.IsConnectionControl)
- {
- this.HandleConnectionControl(command as ConnectionControl);
- }
- }
-
- this.Command(sender, command);
- }
-
- public void Oneway(Command command)
- {
- Exception error = null;
-
- lock(reconnectMutex)
- {
- if(command != null && ConnectedTransport == null)
- {
- if(command.IsShutdownInfo)
- {
- // Skipping send of ShutdownInfo command when not connected.
- return;
- }
- else if(command.IsRemoveInfo || command.IsMessageAck)
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Threading;
+using System.Text;
+using System.Net;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.State;
+using Apache.NMS.ActiveMQ.Threads;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Failover
+{
+ /// <summary>
+ /// A Transport that is made reliable by being able to fail over to another
+ /// transport when a transport failure is detected.
+ /// </summary>
+ public class FailoverTransport : ICompositeTransport, IComparable
+ {
+ private static int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
+ private static int INFINITE = -1;
+
+ private static int idCounter = 0;
+ private readonly int id;
+
+ private bool disposed;
+ private bool connected;
+ private readonly List<Uri> uris = new List<Uri>();
+ private readonly List<Uri> updated = new List<Uri>();
+
+ private CommandHandler commandHandler;
+ private ExceptionHandler exceptionHandler;
+ private InterruptedHandler interruptedHandler;
+ private ResumedHandler resumedHandler;
+
+ private readonly CountDownLatch listenerLatch = new CountDownLatch(4);
+ private readonly Mutex reconnectMutex = new Mutex();
+ private readonly Mutex backupMutex = new Mutex();
+ private readonly Mutex sleepMutex = new Mutex();
+ private readonly ConnectionStateTracker stateTracker = new ConnectionStateTracker();
+ private readonly Dictionary<int, Command> requestMap = new Dictionary<int, Command>();
+
+ private Uri connectedTransportURI;
+ private Uri failedConnectTransportURI;
+ private readonly AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);
+ private TaskRunner reconnectTask = null;
+ private bool started;
+ private bool initialized;
+ private int initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
+ private int maxReconnectDelay = 1000 * 30;
+ private int backOffMultiplier = 2;
+ private int timeout = INFINITE;
+ private bool useExponentialBackOff = true;
+ private bool randomize = true;
+ private int maxReconnectAttempts = INFINITE;
+ private int startupMaxReconnectAttempts = INFINITE;
+ private int connectFailures;
+ private int reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
+ private Exception connectionFailure;
+ private bool firstConnection = true;
+ private bool backup = false;
+ private readonly List<BackupTransport> backups = new List<BackupTransport>();
+ private int backupPoolSize = 1;
+ private bool trackMessages = false;
+ private bool trackTransactionProducers = true;
+ private int maxCacheSize = 256;
+ private volatile Exception failure;
+ private readonly object mutex = new object();
+ private bool reconnectSupported = true;
+ private bool updateURIsSupported = true;
+ private bool doRebalance = false;
+ private bool connectedToPriority = false;
+ private bool priorityBackup = false;
+ private List<Uri> priorityList = new List<Uri>();
+ private bool priorityBackupAvailable = false;
+
+ // Not Sure how to work these back in with all the changes.
+ //private int asyncTimeout = 45000;
+ //private bool asyncConnect = false;
+
+ public FailoverTransport()
+ {
+ id = idCounter++;
+
+ stateTracker.TrackTransactions = true;
+ reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(
+ new FailoverTask(this), "ActiveMQ Failover Worker: " + this.GetHashCode().ToString());
+ }
+
+ ~FailoverTransport()
+ {
+ Dispose(false);
+ }
+
+ #region FailoverTask
+
+ private class FailoverTask : Task
+ {
+ private readonly FailoverTransport parent;
+
+ public FailoverTask(FailoverTransport p)
+ {
+ parent = p;
+ }
+
+ public bool Iterate()
+ {
+ bool result = false;
+ if (!parent.IsStarted)
+ {
+ return false;
+ }
+
+ bool buildBackup = true;
+ lock (parent.backupMutex)
+ {
+ if ((parent.connectedTransport.Value == null || parent.doRebalance || parent.priorityBackupAvailable) && !parent.disposed)
+ {
+ result = parent.DoConnect();
+ buildBackup = false;
+ }
+ }
+ if (buildBackup)
+ {
+ parent.BuildBackups();
+ if (parent.priorityBackup && !parent.connectedToPriority)
+ {
+ try
+ {
+ parent.DoDelay();
+ if (parent.reconnectTask == null)
+ {
+ return true;
+ }
+ parent.reconnectTask.Wakeup();
+ }
+ catch (ThreadInterruptedException)
+ {
+ Tracer.Debug("Reconnect task has been interrupted.");
+ }
+ }
+ }
+ else
+ {
+ try
+ {
+ if (parent.reconnectTask == null)
+ {
+ return true;
+ }
+ parent.reconnectTask.Wakeup();
+ }
+ catch (ThreadInterruptedException)
+ {
+ Tracer.Debug("Reconnect task has been interrupted.");
+ }
+ }
+ return result;
+ }
+ }
+
+ #endregion
+
+ #region Property Accessors
+
+ public CommandHandler Command
+ {
+ get { return commandHandler; }
+ set
+ {
+ commandHandler = value;
+ listenerLatch.countDown();
+ }
+ }
+
+ public ExceptionHandler Exception
+ {
+ get { return exceptionHandler; }
+ set
+ {
+ exceptionHandler = value;
+ listenerLatch.countDown();
+ }
+ }
+
+ public InterruptedHandler Interrupted
+ {
+ get { return interruptedHandler; }
+ set
+ {
+ this.interruptedHandler = value;
+ this.listenerLatch.countDown();
+ }
+ }
+
+ public ResumedHandler Resumed
+ {
+ get { return resumedHandler; }
+ set
+ {
+ this.resumedHandler = value;
+ this.listenerLatch.countDown();
+ }
+ }
+
+ internal Exception Failure
+ {
+ get { return failure; }
+ set
+ {
+ lock(mutex)
+ {
+ failure = value;
+ }
+ }
+ }
+
+ public int Timeout
+ {
+ get { return this.timeout; }
+ set { this.timeout = value; }
+ }
+
+ public int InitialReconnectDelay
+ {
+ get { return initialReconnectDelay; }
+ set { initialReconnectDelay = value; }
+ }
+
+ public int MaxReconnectDelay
+ {
+ get { return maxReconnectDelay; }
+ set { maxReconnectDelay = value; }
+ }
+
+ public int ReconnectDelay
+ {
+ get { return reconnectDelay; }
+ set { reconnectDelay = value; }
+ }
+
+ public int ReconnectDelayExponent
+ {
+ get { return backOffMultiplier; }
+ set { backOffMultiplier = value; }
+ }
+
+ public ITransport ConnectedTransport
+ {
+ get { return connectedTransport.Value; }
+ set { connectedTransport.Value = value; }
+ }
+
+ public Uri ConnectedTransportURI
+ {
+ get { return connectedTransportURI; }
+ set { connectedTransportURI = value; }
+ }
+
+ public int MaxReconnectAttempts
+ {
+ get { return maxReconnectAttempts; }
+ set { maxReconnectAttempts = value; }
+ }
+
+ public int StartupMaxReconnectAttempts
+ {
+ get { return startupMaxReconnectAttempts; }
+ set { startupMaxReconnectAttempts = value; }
+ }
+
+ public bool Randomize
+ {
+ get { return randomize; }
+ set { randomize = value; }
+ }
+
+ public bool Backup
+ {
+ get { return backup; }
+ set { backup = value; }
+ }
+
+ public bool PriorityBackup
+ {
+ get { return priorityBackup; }
+ set { this.priorityBackup = value; }
+ }
+
+ public String PriorityURIs
+ {
+ get { return PrintableUriList(priorityList); }
+ set { this.ProcessDelimitedUriList(value, priorityList); }
+ }
+
+ public int BackupPoolSize
+ {
+ get { return backupPoolSize; }
+ set { backupPoolSize = value; }
+ }
+
+ public bool TrackMessages
+ {
+ get { return trackMessages; }
+ set { trackMessages = value; }
+ }
+
+ public bool TrackTransactionProducers
+ {
+ get { return trackTransactionProducers; }
+ set { this.trackTransactionProducers = value; }
+ }
+
+ public int MaxCacheSize
+ {
+ get { return maxCacheSize; }
+ set { maxCacheSize = value; }
+ }
+
+ public bool UseExponentialBackOff
+ {
+ get { return useExponentialBackOff; }
+ set { useExponentialBackOff = value; }
+ }
+
+ public IWireFormat WireFormat
+ {
+ get
+ {
+ ITransport transport = ConnectedTransport;
+ if(transport != null)
+ {
+ return transport.WireFormat;
+ }
+
+ return null;
+ }
+ }
+
+ /// <summary>
+ /// Gets or sets a value indicating whether to asynchronously connect to sockets
+ /// </summary>
+ /// <value><c>true</c> if [async connect]; otherwise, <c>false</c>.</value>
+ public bool AsyncConnect
+ {
+ set { }
+ }
+
+ /// <summary>
+ /// If doing an asynchronous connect, the milliseconds before timing out if no connection can be made
+ /// </summary>
+ /// <value>The async timeout.</value>
+ public int AsyncTimeout
+ {
+ get { return 0; }
+ set { }
+ }
+
+ public ConnectionStateTracker StateTracker
+ {
+ get { return this.stateTracker; }
+ }
+
+ #endregion
+
+ public bool IsFaultTolerant
+ {
+ get { return true; }
+ }
+
+ public bool IsDisposed
+ {
+ get { return disposed; }
+ }
+
+ public bool IsConnected
+ {
+ get { return connected; }
+ }
+
+ public bool IsConnectedToPriority
+ {
+ get { return connectedToPriority; }
+ }
+
+ public bool IsStarted
+ {
+ get { return started; }
+ }
+
+ public bool IsReconnectSupported
+ {
+ get { return this.reconnectSupported; }
+ }
+
+ public bool IsUpdateURIsSupported
+ {
+ get { return this.updateURIsSupported; }
+ }
+
+ public void OnException(ITransport sender, Exception error)
+ {
+ try
+ {
+ HandleTransportFailure(error);
+ }
+ catch(Exception)
+ {
+ this.Exception(this, new IOException("Unexpected Transport Failure."));
+ }
+ }
+
+ public void DisposedOnCommand(ITransport sender, Command c)
+ {
+ }
+
+ public void DisposedOnException(ITransport sender, Exception e)
+ {
+ }
+
+ public void HandleTransportFailure(Exception e)
+ {
+ ITransport transport = connectedTransport.GetAndSet(null);
+ if (transport == null)
+ {
+ // sync with possible in progress reconnect
+ lock(reconnectMutex)
+ {
+ transport = connectedTransport.GetAndSet(null);
+ }
+ }
+
+ if(transport != null)
+ {
+ DisposeTransport(transport);
+
+ bool reconnectOk = false;
+ lock(reconnectMutex)
+ {
+ if (CanReconnect())
+ {
+ Tracer.WarnFormat("Transport failed to {0}, attempting to automatically reconnect due to: {1}",
+ ConnectedTransportURI, e.Message);
+ reconnectOk = true;
+ }
+
+ initialized = false;
+ failedConnectTransportURI = ConnectedTransportURI;
+ ConnectedTransportURI = null;
+ connectedToPriority = false;
+ connected = false;
+
+ if (reconnectOk)
+ {
+ if(this.Interrupted != null)
+ {
+ this.Interrupted(transport);
+ }
+
+ updated.Remove(failedConnectTransportURI);
+ reconnectTask.Wakeup();
+ }
+ else if (!disposed)
+ {
+ PropagateFailureToExceptionListener(e);
+ }
+ }
+ }
+ }
+
+ private bool CanReconnect()
+ {
+ return started && 0 != CalculateReconnectAttemptLimit();
+ }
+
+ public void Start()
+ {
+ lock(reconnectMutex)
+ {
+ if(started)
+ {
+ Tracer.Debug("FailoverTransport Already Started.");
+ return;
+ }
+
+ Tracer.Debug("FailoverTransport Started.");
+ started = true;
+ stateTracker.MaxCacheSize = MaxCacheSize;
+ stateTracker.TrackMessages = TrackMessages;
+ stateTracker.TrackTransactionProducers = TrackTransactionProducers;
+ if(ConnectedTransport != null)
+ {
+ Tracer.Debug("FailoverTransport already connected, start is restoring.");
+ stateTracker.DoRestore(ConnectedTransport);
+ }
+ else
+ {
+ Tracer.Debug("FailoverTransport not connected, start is reconnecting.");
+ Reconnect(false);
+ }
+ }
+ }
+
+ public virtual void Stop()
+ {
+ ITransport transportToStop = null;
+ List<ITransport> backupsToStop = new List<ITransport>(backups.Count);
+
+ try
+ {
+ lock(reconnectMutex)
+ {
+ if(!started)
+ {
+ Tracer.Debug("FailoverTransport Already Stopped.");
+ return;
+ }
+
+ Tracer.Debug("FailoverTransport Stopped.");
+ started = false;
+ disposed = true;
+ connected = false;
+ if(ConnectedTransport != null)
+ {
+ transportToStop = connectedTransport.GetAndSet(null);
+ }
+
+ }
+ lock(sleepMutex)
+ {
+ Monitor.PulseAll(sleepMutex);
+ }
+ }
+ finally
+ {
+ if(reconnectTask != null)
+ {
+ reconnectTask.Shutdown();
+ }
+ }
+
+ lock(backupMutex)
+ {
+ foreach (BackupTransport backup in backups)
+ {
+ backup.Disposed = true;
+ ITransport transport = backup.Transport;
+ if (transport != null)
+ {
+ transport.Command = DisposedOnCommand;
+ transport.Exception = DisposedOnException;
+ backupsToStop.Add(transport);
+ }
+ }
+ backups.Clear();
+ }
+
+ foreach (ITransport transport in backupsToStop)
+ {
+ try
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Stopped backup: " + transport);
+ }
+ DisposeTransport(transport);
+ }
+ catch (Exception)
+ {
+ }
+ }
+
+ if(transportToStop != null)
+ {
+ transportToStop.Stop();
+ }
+ }
+
+ public FutureResponse AsyncRequest(Command command)
+ {
+ throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");
+ }
+
+ public Response Request(Command command)
+ {
+ throw new ApplicationException("FailoverTransport does not implement Request(Command)");
+ }
+
+ public Response Request(Command command, TimeSpan ts)
+ {
+ throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");
+ }
+
+ public void OnCommand(ITransport sender, Command command)
+ {
+ if(command != null)
+ {
+ if(command.IsResponse)
+ {
+ Command request = null;
+ lock(((ICollection) requestMap).SyncRoot)
+ {
+ int v = ((Response) command).CorrelationId;
+ try
+ {
+ if(requestMap.TryGetValue(v, out request))
+ {
+ requestMap.Remove(v);
+ }
+ }
+ catch
+ {
+ }
+ }
+
+ Tracked tracked = request as Tracked;
+ if(tracked != null)
{
- stateTracker.Track(command);
- // Simulate response to RemoveInfo command or a MessageAck
- // since it would be stale at this point.
- if(command.ResponseRequired)
- {
- OnCommand(this, new Response() { CorrelationId = command.CommandId });
- }
- return;
- }
- else if(command.IsMessagePull)
- {
- // Simulate response to MessagePull if timed as we can't honor that now.
- MessagePull pullRequest = command as MessagePull;
- if (pullRequest.Timeout != 0)
- {
- MessageDispatch dispatch = new MessageDispatch();
- dispatch.ConsumerId = pullRequest.ConsumerId;
- dispatch.Destination = pullRequest.Destination;
- OnCommand(this, dispatch);
- }
- return;
- }
- }
-
- // Keep trying until the message is sent.
- for(int i = 0; !disposed; i++)
- {
- try
- {
- // Any Ack that was being sent when the connection dropped is now
- // stale so we don't send it here as it would cause an unmatched ack
- // on the broker side and probably prevent a consumer from getting
- // any new messages.
- if(command.IsMessageAck && i > 0)
- {
- Tracer.Debug("Inflight MessageAck being dropped as stale.");
- if(command.ResponseRequired)
- {
- OnCommand(this, new Response() { CorrelationId = command.CommandId });
- }
- return;
- }
-
- // Wait for transport to be connected.
- ITransport transport = ConnectedTransport;
- DateTime start = DateTime.Now;
+ tracked.OnResponse();
+ }
+ }
+
+ if(!initialized)
+ {
+ initialized = true;
+ }
+
+ if(command.IsConnectionControl)
+ {
+ this.HandleConnectionControl(command as ConnectionControl);
+ }
+ }
+
+ this.Command(sender, command);
+ }
+
+ public void Oneway(Command command)
+ {
+ Exception error = null;
+
+ lock(reconnectMutex)
+ {
+ if(command != null && ConnectedTransport == null)
+ {
+ if(command.IsShutdownInfo)
+ {
+ // Skipping send of ShutdownInfo command when not connected.
+ return;
+ }
+ else if(command.IsRemoveInfo || command.IsMessageAck)
+ {
+ stateTracker.Track(command);
+ // Simulate response to RemoveInfo command or a MessageAck
+ // since it would be stale at this point.
+ if(command.ResponseRequired)
+ {
+ OnCommand(this, new Response() { CorrelationId = command.CommandId });
+ }
+ return;
+ }
+ else if(command.IsMessagePull)
+ {
+ // Simulate response to MessagePull if timed as we can't honor that now.
+ MessagePull pullRequest = command as MessagePull;
+ if (pullRequest.Timeout != 0)
+ {
+ MessageDispatch dispatch = new MessageDispatch();
+ dispatch.ConsumerId = pullRequest.ConsumerId;
+ dispatch.Destination = pullRequest.Destination;
+ OnCommand(this, dispatch);
+ }
+ return;
+ }
+ }
+
+ // Keep trying until the message is sent.
+ for(int i = 0; !disposed; i++)
+ {
+ try
+ {
+ // Any Ack that was being sent when the connection dropped is now
+ // stale so we don't send it here as it would cause an unmatched ack
+ // on the broker side and probably prevent a consumer from getting
+ // any new messages.
+ if(command.IsMessageAck && i > 0)
+ {
+ Tracer.Debug("Inflight MessageAck being dropped as stale.");
+ if(command.ResponseRequired)
+ {
+ OnCommand(this, new Response() { CorrelationId = command.CommandId });
+ }
+ return;
+ }
+
+ // Wait for transport to be connected.
+ ITransport transport = ConnectedTransport;
+ DateTime start = DateTime.Now;
bool timedout = false;
- while(transport == null && !disposed && connectionFailure == null)
- {
- Tracer.Debug("Waiting for transport to reconnect.");
-
- int elapsed = (int) (DateTime.Now - start).TotalMilliseconds;
- if(this.timeout > 0 && elapsed > this.timeout)
- {
- timedout = true;
- Tracer.DebugFormat("FailoverTransport.oneway - timed out after {0} mills", elapsed);
- break;
+ TimeSpan timewait = TimeSpan.FromMilliseconds(-1);
+
+ while(transport == null && !disposed && connectionFailure == null)
+ {
+ Tracer.Debug("Waiting for transport to reconnect.");
+
+ int elapsed = (int) (DateTime.Now - start).TotalMilliseconds;
+ if(this.timeout > 0 && elapsed > this.timeout)
+ {
+ timedout = true;
+ Tracer.DebugFormat("FailoverTransport.oneway - timed out after {0} mills", elapsed);
+ break;
+ }
+
+ if(this.timeout > 0)
+ {
+ // Set the timeout for waiting to be at most 100ms past the maximum timeout length.
+ int remainingTime = (this.timeout - elapsed) + 100;
+ timewait = TimeSpan.FromMilliseconds(remainingTime);
+ }
+
+ // Release so that the reconnect task can run
+ try
+ {
+ // Wait for something. The mutex will be pulsed if we connect, or are shut down.
+ Monitor.Wait(reconnectMutex, timewait);
+ }
+ catch(ThreadInterruptedException e)
+ {
+ Tracer.DebugFormat("Interrupted: {0}", e.Message);
+ }
+
+ transport = ConnectedTransport;
+ }
+
+ if(transport == null)
+ {
+ // Previous loop may have exited due to use being disposed.
+ if(disposed)
+ {
+ error = new IOException("Transport disposed.");
+ }
+ else if(connectionFailure != null)
+ {
+ error = connectionFailure;
+ }
+ else if(timedout)
+ {
+ error = new IOException("Failover oneway timed out after " + timeout + " milliseconds.");
+ }
+ else
+ {
+ error = new IOException("Unexpected failure.");
+ }
+ break;
+ }
+
+ // If it was a request and it was not being tracked by
+ // the state tracker, then hold it in the requestMap so
+ // that we can replay it later.
+ Tracked tracked = stateTracker.Track(command);
+ lock(((ICollection) requestMap).SyncRoot)
+ {
+ if(tracked != null && tracked.WaitingForResponse)
+ {
+ requestMap.Add(command.CommandId, tracked);
+ }
+ else if(tracked == null && command.ResponseRequired)
+ {
+ requestMap.Add(command.CommandId, command);
}
+ }
+
+ // Send the message.
+ try
+ {
+ transport.Oneway(command);
+ stateTracker.TrackBack(command);
+ }
+ catch(Exception e)
+ {
+ // If the command was not tracked.. we will retry in this method
+ // otherwise we need to trigger a reconnect before returning as
+ // the transport is failed.
+ if (tracked == null)
+ {
+ // since we will retry in this method.. take it
+ // out of the request map so that it is not
+ // sent 2 times on recovery
+ if(command.ResponseRequired)
+ {
+ lock(((ICollection) requestMap).SyncRoot)
+ {
+ requestMap.Remove(command.CommandId);
+ }
+ }
+
+ // Rethrow the exception so it will handled by
+ // the outer catch
+ throw;
+ }
+ else
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}",
+ i, e.Message);
+ Tracer.DebugFormat("Failed Message Was: {0}", command);
+ }
+ HandleTransportFailure(e);
+ }
+ }
+
+ return;
+ }
+ catch(Exception e)
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}",
+ i, e.Message);
+ Tracer.DebugFormat("Failed Message Was: {0}", command);
+ }
+ HandleTransportFailure(e);
+ }
+ }
+ }
+
+ if(!disposed)
+ {
+ if(error != null)
+ {
+ throw error;
+ }
+ }
+ }
+
+ public void Add(bool rebalance, Uri[] urisToAdd)
+ {
+ bool newUri = false;
+ lock(uris)
+ {
+ foreach (Uri uri in urisToAdd)
+ {
+ if(!Contains(uri))
+ {
+ uris.Add(uri);
+ newUri = true;
+ }
+ }
+ }
+
+ if (newUri)
+ {
+ Reconnect(rebalance);
+ }
+ }
+
+ public void Add(bool rebalance, String u)
+ {
+ try
+ {
+ Add(rebalance, new Uri[] { new Uri(u) });
+ }
+ catch(Exception e)
+ {
+ Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
+ }
+ }
+
+ public void Remove(bool rebalance, Uri[] u)
+ {
+ lock(uris)
+ {
+ for(int i = 0; i < u.Length; i++)
+ {
+ uris.Remove(u[i]);
+ }
+ }
+
+ Reconnect(rebalance);
+ }
+
+ public void Remove(bool rebalance, String u)
+ {
+ try
+ {
+ Remove(rebalance, new Uri[] { new Uri(u) });
+ }
+ catch(Exception e)
+ {
+ Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
+ }
+ }
+
+ public void Reconnect(Uri uri)
+ {
+ Add(true, new Uri[] { uri });
+ }
+
+ public void Reconnect(bool rebalance)
+ {
+ lock(reconnectMutex)
+ {
+ if(started)
+ {
+ if (rebalance)
+ {
+ doRebalance = true;
+ }
+ Tracer.Debug("Waking up reconnect task");
+ try
+ {
+ reconnectTask.Wakeup();
+ }
+ catch (ThreadInterruptedException)
+ {
+ }
+ }
+ else
+ {
+ Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
+ }
+ }
+ }
+
+ private List<Uri> ConnectList
+ {
+ get
+ {
+ if (updated.Count != 0)
+ {
+ return updated;
+ }
+
+ List<Uri> l = new List<Uri>(uris);
+ bool removed = false;
+ if(failedConnectTransportURI != null)
+ {
+ removed = l.Remove(failedConnectTransportURI);
+ }
+
+ if(Randomize)
+ {
+ Shuffle(l);
+ }
+
+ if(removed)
+ {
+ l.Add(failedConnectTransportURI);
+ }
+
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Uri connection list: {0} from: {1}",
+ PrintableUriList(l), PrintableUriList(uris));
+ }
+
+ return l;
+ }
+ }
+
+ protected void RestoreTransport(ITransport t)
+ {
+ Tracer.Info("Restoring previous transport connection.");
+ t.Start();
+
+ // Send information to the broker - informing it we are a fault tolerant client
+ t.Oneway(new ConnectionControl() { FaultTolerant = true });
+ stateTracker.DoRestore(t);
+
+ Tracer.Info("Sending queued commands...");
+ Dictionary<int, Command> tmpMap = null;
+ lock(((ICollection) requestMap).SyncRoot)
+ {
+ tmpMap = new Dictionary<int, Command>(requestMap);
+ }
+
+ foreach(Command command in tmpMap.Values)
+ {
+ if(command.IsMessageAck)
+ {
+ Tracer.Debug("Stored MessageAck being dropped as stale.");
+ OnCommand(this, new Response() { CorrelationId = command.CommandId });
+ continue;
+ }
+
+ t.Oneway(command);
+ }
+ }
+
+ public Uri RemoteAddress
+ {
+ get
+ {
+ if(ConnectedTransport != null)
+ {
+ return ConnectedTransport.RemoteAddress;
+ }
+ return null;
+ }
+ }
+
+ public Object Narrow(Type type)
+ {
+ if(this.GetType().Equals(type))
+ {
+ return this;
+ }
+ else if(ConnectedTransport != null)
+ {
+ return ConnectedTransport.Narrow(type);
+ }
+
+ return null;
+ }
+
+ private bool DoConnect()
+ {
+ lock(reconnectMutex)
+ {
+ if (disposed || connectionFailure != null)
+ {
+ Monitor.PulseAll(reconnectMutex);
+ }
+
+ if ((connectedTransport.Value != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null)
+ {
+ return false;
+ }
+ else
+ {
+ List<Uri> connectList = ConnectList;
+ if(connectList.Count == 0)
+ {
+ Failure = new NMSConnectionException("No URIs available for connection.");
+ }
+ else
+ {
+ if (doRebalance)
+ {
+ if (connectedToPriority || CompareUris(connectList[0], connectedTransportURI))
+ {
+ // already connected to first in the list, no need to rebalance
+ doRebalance = false;
+ return false;
+ }
+ else
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Doing rebalance from: {0} to {1}",
+ connectedTransportURI, PrintableUriList(connectList));
+ }
+ try
+ {
+ ITransport current = this.connectedTransport.GetAndSet(null);
+ if (current != null)
+ {
+ DisposeTransport(current);
+ }
+ }
+ catch (Exception e)
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Caught an exception stopping existing " +
+ "transport for rebalance {0}", e.Message);
+ }
+ }
+ }
+
+ doRebalance = false;
+ }
+
+ ResetReconnectDelay();
+
+ ITransport transport = null;
+ Uri uri = null;
+
+ // If we have a backup already waiting lets try it.
+ lock(backupMutex)
+ {
+ if ((priorityBackup || backup) && backups.Count > 0)
+ {
+ List<BackupTransport> l = new List<BackupTransport>(backups);
+ if (randomize)
+ {
+ Shuffle(l);
+ }
+ BackupTransport bt = l[0];
+ l.RemoveAt(0);
+ backups.Remove(bt);
+ transport = bt.Transport;
+ uri = bt.Uri;
+ if (priorityBackup && priorityBackupAvailable)
+ {
+ ITransport old = this.connectedTransport.GetAndSet(null);
+ if (old != null)
+ {
+ DisposeTransport(old);
+ }
+ priorityBackupAvailable = false;
+ }
+ }
+ }
+
+ // Sleep for the reconnectDelay if there's no backup and we aren't trying
+ // for the first time, or we were disposed for some reason.
+ if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed)
+ {
+ lock(sleepMutex)
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Waiting {0} ms before attempting connection.", reconnectDelay);
+ }
+ try
+ {
+ Monitor.Wait(sleepMutex, reconnectDelay);
+ }
+ catch (ThreadInterruptedException)
+ {
+ }
+ }
+ }
+
+ IEnumerator<Uri> iter = connectList.GetEnumerator();
+ while ((transport != null || iter.MoveNext()) && (connectedTransport.Value == null && !disposed))
+ {
+ try
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Attempting {0}th connect to: {1}",
+ connectFailures, uri);
+ }
+
+ // We could be starting with a backup and if so we wait to grab a
+ // URI from the pool until next time around.
+ if (transport == null)
+ {
+ uri = iter.Current;
+ transport = TransportFactory.CompositeConnect(uri);
+ }
+
+ transport.Command = OnCommand;
+ transport.Exception = OnException;
+ transport.Start();
+
+ if (started && !firstConnection)
+ {
+ RestoreTransport(transport);
+ }
+
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Connection established");
+ }
+ reconnectDelay = initialReconnectDelay;
+ connectedTransportURI = uri;
+ connectedTransport.Value = transport;
+ connectedToPriority = IsPriority(connectedTransportURI);
+ Monitor.PulseAll(reconnectMutex);
+ connectFailures = 0;
+
+ // Try to wait long enough for client to init the event callbacks.
+ listenerLatch.await(TimeSpan.FromSeconds(2));
+
+ if (Resumed != null)
+ {
+ Resumed(transport);
+ }
+ else
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("transport resumed by transport listener not set");
+ }
+ }
+
+ if (firstConnection)
+ {
+ firstConnection = false;
+ Tracer.Info("Successfully connected to " + uri);
+ }
+ else
+ {
+ Tracer.Info("Successfully reconnected to " + uri);
+ }
+
+ connected = true;
+ return false;
+ }
+ catch (Exception e)
+ {
+ failure = e;
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Connect fail to: " + uri + ", reason: " + e.Message);
+ }
+ if (transport != null)
+ {
+ try
+ {
+ transport.Stop();
+ transport = null;
+ }
+ catch (Exception ee)
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Stop of failed transport: " + transport +
+ " failed with reason: " + ee.Message);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ int reconnectLimit = CalculateReconnectAttemptLimit();
+
+ connectFailures++;
+ if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit)
+ {
+ Tracer.ErrorFormat("Failed to connect to {0} after: {1} attempt(s)",
+ PrintableUriList(uris), connectFailures);
+ connectionFailure = failure;
+
+ // Make sure on initial startup, that the transportListener has been
+ // initialized for this instance.
+ listenerLatch.await(TimeSpan.FromSeconds(2));
+ PropagateFailureToExceptionListener(connectionFailure);
+ return false;
+ }
+ }
+
+ if(!disposed)
+ {
+ DoDelay();
+ }
+
+ return !disposed;
+ }
+
+ private bool BuildBackups()
+ {
+ lock(backupMutex)
+ {
+ if (!disposed && (backup || priorityBackup) && backups.Count < backupPoolSize)
+ {
+ List<Uri> backupList = new List<Uri>(priorityList);
+ List<Uri> connectList = ConnectList;
+ foreach(Uri uri in connectList)
+ {
+ if (!backupList.Contains(uri))
+ {
+ backupList.Add(uri);
+ }
+ }
+ foreach(BackupTransport bt in backups)
+ {
+ if(bt.Disposed)
+ {
+ backups.Remove(bt);
+ }
+ }
+
+ foreach(Uri uri in connectList)
+ {
+ if (disposed)
+ {
+ break;
+ }
+
+ if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))
+ {
+ try
+ {
+ BackupTransport bt = new BackupTransport(this)
+ {
+ Uri = uri
+ };
+
+ if(!backups.Contains(bt))
+ {
+ ITransport t = TransportFactory.CompositeConnect(uri);
+ t.Command = bt.OnCommand;
+ t.Exception = bt.OnException;
+ t.Start();
+ bt.Transport = t;
+ if (priorityBackup && IsPriority(uri))
+ {
+ priorityBackupAvailable = true;
+ backups.Insert(0, bt);
+ }
+ else
+ {
+ backups.Add(bt);
+ }
+ }
+ }
+ catch(Exception e)
+ {
+ Tracer.DebugFormat("Failed to build backup: {0}", e.Message);
+ }
+ }
+
+ if(backups.Count == BackupPoolSize)
+ {
+ break;
+ }
+ }
+ }
+ }
+
+ return false;
+ }
+
+ public void ConnectionInterruptProcessingComplete(ConnectionId connectionId)
+ {
+ lock(reconnectMutex)
+ {
+ Tracer.Debug("Connection Interrupt Processing is complete for ConnectionId: " + connectionId);
+ stateTracker.ConnectionInterruptProcessingComplete(this, connectionId);
+ }
+ }
+
+ public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+ {
+ if(IsUpdateURIsSupported)
+ {
+ Dictionary<Uri, bool> copy = new Dictionary<Uri, bool>();
+ foreach(Uri uri in updated)
+ {
+ if(uri != null)
+ {
+ copy[uri] = true;
+ }
+ }
+
+ updated.Clear();
+
+ if(updatedURIs != null && updatedURIs.Length > 0)
+ {
+ Dictionary<Uri, bool> uriSet = new Dictionary<Uri, bool>();
+ for(int i = 0; i < updatedURIs.Length; i++)
+ {
+ Uri uri = updatedURIs[i];
+ if(uri != null)
+ {
+ uriSet[uri] = true;
+ }
+ }
+
+ foreach(Uri uri in uriSet.Keys)
+ {
+ if(!updated.Contains(uri))
+ {
+ updated.Add(uri);
+ }
+ }
+
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Updated URIs list {0}", PrintableUriList(updated));
+ }
+
+ if (!(copy.Count == 0 && updated.Count == 0) && !copy.Keys.Equals(updated))
+ {
+ BuildBackups();
+ lock(reconnectMutex)
+ {
+ Reconnect(rebalance);
+ }
+ }
+ }
+ }
+ }
+
+ public void HandleConnectionControl(ConnectionControl control)
+ {
+ string reconnectStr = control.ReconnectTo;
+
+ if(reconnectStr != null)
+ {
+ reconnectStr = reconnectStr.Trim();
+ if(reconnectStr.Length > 0)
+ {
+ try
+ {
+ Uri uri = new Uri(reconnectStr);
+ if(IsReconnectSupported)
+ {
+ Tracer.Info("Reconnecting to: " + uri.OriginalString);
+ Reconnect(uri);
+ }
+ }
+ catch(Exception e)
+ {
+ Tracer.ErrorFormat("Failed to handle ConnectionControl reconnect to {0}: {1}", reconnectStr, e);
+ }
+ }
+ }
+
+ ProcessNewTransports(control.RebalanceConnection, control.ConnectedBrokers);
+ }
+
+ private void ProcessNewTransports(bool rebalance, String newTransports)
+ {
+ if(newTransports != null)
+ {
+ newTransports = newTransports.Trim();
+
+ if(newTransports.Length > 0 && IsUpdateURIsSupported)
+ {
+ List<Uri> list = new List<Uri>();
+ ProcessDelimitedUriList(newTransports, list);
+
+ if(list.Count != 0)
+ {
+ try
+ {
+ UpdateURIs(rebalance, list.ToArray());
+ }
+ catch
+ {
+ Tracer.Error("Failed to update transport URI's from: " + newTransports);
+ }
+ }
+ }
+ }
+ }
+
+ private void ProcessDelimitedUriList(String priorityUris, List<Uri> target)
+ {
+ String[] tokens = priorityUris.Split(new Char[] { ',' });
+
+ foreach(String str in tokens)
+ {
+ try
+ {
+ Uri uri = new Uri(str);
+ target.Add(uri);
+
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Adding new Uri[{0}] to list,", uri);
+ }
+ }
+ catch (Exception e)
+ {
+ Tracer.ErrorFormat("Failed to parse broker address: {0} because of: {1}",
+ str, e.Message);
+ }
+ }
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ public void Dispose(bool disposing)
+ {
+ this.Stop();
+ disposed = true;
+ }
+
+ public int CompareTo(Object o)
+ {
+ if(o is FailoverTransport)
+ {
+ FailoverTransport oo = o as FailoverTransport;
+
+ return this.id - oo.id;
+ }
+ else
+ {
+ throw new ArgumentException();
+ }
+ }
+
+ public override String ToString()
+ {
+ return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString();
+ }
+
+ internal bool IsPriority(Uri uri)
+ {
+ if (priorityBackup)
+ {
+ if (priorityList.Count > 0)
+ {
+ return priorityList.Contains(uri);
+ }
+
+ if (this.uris.Count > 0)
+ {
+ return uris[0].Equals(uri);
+ }
+ }
+ return false;
+ }
+
+ public void DisposeTransport(ITransport transport)
+ {
+ transport.Command = DisposedOnCommand;
+ transport.Exception = DisposedOnException;
+
+ try
+ {
+ transport.Stop();
+ }
+ catch (Exception e)
+ {
+ Tracer.DebugFormat("Could not stop transport: {0]. Reason: {1}", transport, e.Message);
+ }
+ }
+
+ private void ResetReconnectDelay()
+ {
+ if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY)
+ {
+ reconnectDelay = initialReconnectDelay;
+ }
+ }
+
+ private void DoDelay()
+ {
+ if (reconnectDelay > 0)
+ {
+ lock(sleepMutex)
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Waiting {0} ms before attempting connection", reconnectDelay);
+ }
+ try
+ {
+ Monitor.Wait(sleepMutex, reconnectDelay);
+ }
+ catch (ThreadInterruptedException)
+ {
+ }
+ }
+ }
+
+ if (useExponentialBackOff)
+ {
+ // Exponential increment of reconnect delay.
+ reconnectDelay *= backOffMultiplier;
+ if (reconnectDelay > maxReconnectDelay)
+ {
+ reconnectDelay = maxReconnectDelay;
+ }
+ }
+ }
+
+ private void PropagateFailureToExceptionListener(Exception exception)
+ {
+ if (Exception != null)
+ {
+ Exception(this, exception);
+ }
+ else
+ {
+ Exception(this, new IOException());
+ }
+ Monitor.PulseAll(reconnectMutex);
+ }
+
+ private int CalculateReconnectAttemptLimit()
+ {
+ int maxReconnectValue = this.maxReconnectAttempts;
+ if (firstConnection && this.startupMaxReconnectAttempts != INFINITE)
+ {
+ maxReconnectValue = this.startupMaxReconnectAttempts;
+ }
+ return maxReconnectValue;
+ }
+
+ public void Shuffle<T>(List<T> list)
+ {
+ Random random = new Random(DateTime.Now.Millisecond);
+ int index = list.Count;
+ while (index > 1)
+ {
+ index--;
+ int k = random.Next(index + 1);
+ T value = list[k];
+ list[k] = list[index];
+ list[index] = value;
+ }
+ }
+
+ private String PrintableUriList(List<Uri> uriList)
+ {
+ if (uriList.Count == 0)
+ {
+ return "";
+ }
+
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < uriList.Count; ++i)
+ {
+ builder.Append(uriList[i]);
+ if (i < (uriList.Count - 1))
+ {
+ builder.Append(",");
+ }
+ }
+
+ return builder.ToString();
+ }
+
+ private bool CompareUris(Uri first, Uri second)
+ {
+ bool result = false;
+ if (first.Port == second.Port)
+ {
+ IPHostEntry firstAddr = null;
+ IPHostEntry secondAddr = null;
+ try
+ {
+ firstAddr = Dns.GetHostEntry(first.Host);
+ secondAddr = Dns.GetHostEntry(second.Host);
+
+ if (firstAddr.Equals(secondAddr))
+ {
+ result = true;
+ }
+ }
+ catch(Exception e)
+ {
+ if (firstAddr == null)
+ {
+ Tracer.WarnFormat("Failed to Lookup IPHostEntry for URI[{0}] : {1}", first, e);
+ }
+ else
+ {
+ Tracer.WarnFormat("Failed to Lookup IPHostEntry for URI[{0}] : {1}", second, e);
+ }
+
+ if(String.Equals(first.Host, second.Host, StringComparison.CurrentCultureIgnoreCase))
+ {
+ result = true;
+ }
+ }
+
+ }
+
+ return result;
+ }
+
+ private bool Contains(Uri newURI)
+ {
+ bool result = false;
+ foreach (Uri uri in uris)
+ {
+ if (CompareUris(newURI, uri))
+ {
+ result = true;
+ break;
+ }
+ }
- // Release so that the reconnect task can run
- try
- {
- // Wait for something. The mutex will be pulsed if we connect.
- Monitor.Wait(reconnectMutex, 100);
- }
- catch(ThreadInterruptedException e)
- {
- Tracer.DebugFormat("Interrupted: {0}", e.Message);
- }
-
- transport = ConnectedTransport;
- }
-
- if(transport == null)
- {
- // Previous loop may have exited due to use being disposed.
- if(disposed)
- {
- error = new IOException("Transport disposed.");
- }
- else if(connectionFailure != null)
- {
- error = connectionFailure;
- }
- else if(timedout)
- {
- error = new IOException("Failover oneway timed out after " + timeout + " milliseconds.");
- }
- else
- {
- error = new IOException("Unexpected failure.");
- }
- break;
- }
-
- // If it was a request and it was not being tracked by
- // the state tracker, then hold it in the requestMap so
- // that we can replay it later.
- Tracked tracked = stateTracker.Track(command);
- lock(((ICollection) requestMap).SyncRoot)
- {
- if(tracked != null && tracked.WaitingForResponse)
- {
- requestMap.Add(command.CommandId, tracked);
- }
- else if(tracked == null && command.ResponseRequired)
- {
- requestMap.Add(command.CommandId, command);
- }
- }
-
- // Send the message.
- try
- {
- transport.Oneway(command);
- stateTracker.TrackBack(command);
- }
- catch(Exception e)
- {
- // If the command was not tracked.. we will retry in this method
- // otherwise we need to trigger a reconnect before returning as
- // the transport is failed.
- if (tracked == null)
- {
- // since we will retry in this method.. take it
- // out of the request map so that it is not
- // sent 2 times on recovery
- if(command.ResponseRequired)
- {
- lock(((ICollection) requestMap).SyncRoot)
- {
- requestMap.Remove(command.CommandId);
- }
- }
-
- // Rethrow the exception so it will handled by
- // the outer catch
- throw;
- }
- else
- {
- if (Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}",
- i, e.Message);
- Tracer.DebugFormat("Failed Message Was: {0}", command);
- }
- HandleTransportFailure(e);
- }
- }
-
- return;
- }
- catch(Exception e)
- {
- if (Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}",
- i, e.Message);
- Tracer.DebugFormat("Failed Message Was: {0}", command);
- }
- HandleTransportFailure(e);
- }
- }
- }
-
- if(!disposed)
- {
- if(error != null)
- {
- throw error;
- }
- }
- }
-
- public void Add(bool rebalance, Uri[] urisToAdd)
- {
- bool newUri = false;
- lock(uris)
- {
- foreach (Uri uri in urisToAdd)
- {
- if(!Contains(uri))
- {
- uris.Add(uri);
- newUri = true;
- }
- }
- }
-
- if (newUri)
- {
- Reconnect(rebalance);
- }
- }
-
- public void Add(bool rebalance, String u)
- {
- try
- {
- Add(rebalance, new Uri[] { new Uri(u) });
- }
- catch(Exception e)
- {
- Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
- }
- }
-
- public void Remove(bool rebalance, Uri[] u)
- {
- lock(uris)
- {
- for(int i = 0; i < u.Length; i++)
- {
- uris.Remove(u[i]);
- }
- }
-
- Reconnect(rebalance);
- }
-
- public void Remove(bool rebalance, String u)
- {
- try
- {
- Remove(rebalance, new Uri[] { new Uri(u) });
- }
- catch(Exception e)
- {
- Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
- }
- }
-
- public void Reconnect(Uri uri)
- {
- Add(true, new Uri[] { uri });
- }
-
- public void Reconnect(bool rebalance)
- {
- lock(reconnectMutex)
- {
- if(started)
- {
- if (rebalance)
- {
- doRebalance = true;
- }
- Tracer.Debug("Waking up reconnect task");
- try
- {
- reconnectTask.Wakeup();
- }
- catch (ThreadInterruptedException)
- {
- }
- }
- else
- {
- Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
- }
- }
- }
-
- private List<Uri> ConnectList
- {
- get
- {
- if (updated.Count != 0)
- {
- return updated;
- }
-
- List<Uri> l = new List<Uri>(uris);
- bool removed = false;
- if(failedConnectTransportURI != null)
- {
- removed = l.Remove(failedConnectTransportURI);
- }
-
- if(Randomize)
- {
- Shuffle(l);
- }
-
- if(removed)
- {
- l.Add(failedConnectTransportURI);
- }
-
- if (Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Uri connection list: {0} from: {1}",
- PrintableUriList(l), PrintableUriList(uris));
- }
-
- return l;
- }
- }
-
- protected void RestoreTransport(ITransport t)
- {
- Tracer.Info("Restoring previous transport connection.");
- t.Start();
-
- // Send information to the broker - informing it we are a fault tolerant client
- t.Oneway(new ConnectionControl() { FaultTolerant = true });
- stateTracker.DoRestore(t);
-
- Tracer.Info("Sending queued commands...");
- Dictionary<int, Command> tmpMap = null;
- lock(((ICollection) requestMap).SyncRoot)
- {
- tmpMap = new Dictionary<int, Command>(requestMap);
- }
-
- foreach(Command command in tmpMap.Values)
- {
- if(command.IsMessageAck)
- {
- Tracer.Debug("Stored MessageAck being dropped as stale.");
- OnCommand(this, new Response() { CorrelationId = command.CommandId });
- continue;
- }
-
- t.Oneway(command);
- }
- }
-
- public Uri RemoteAddress
- {
- get
- {
- if(ConnectedTransport != null)
- {
- return ConnectedTransport.RemoteAddress;
- }
- return null;
- }
- }
-
- public Object Narrow(Type type)
- {
- if(this.GetType().Equals(type))
- {
- return this;
- }
- else if(ConnectedTransport != null)
- {
- return ConnectedTransport.Narrow(type);
- }
-
- return null;
- }
-
- private bool DoConnect()
- {
- lock(reconnectMutex)
- {
- if (disposed || connectionFailure != null)
- {
- Monitor.PulseAll(reconnectMutex);
- }
-
[... 653 lines stripped ...]