You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2008/10/24 23:10:23 UTC
svn commit: r707747 [4/4] - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk: ./ src/main/csharp/
src/main/csharp/Commands/ src/main/csharp/State/ src/main/csharp/Threads/
src/main/csharp/Transport/ src/main/csharp/Transport/Failover/ src/main/cs...
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.NMS.ActiveMQ.Threads
+{
+ /// <summary>
+ /// Manages the thread pool for long running tasks. Long running tasks are not
+ /// always active but when they are active, they may need a few iterations of
+ /// processing for them to become idle. The manager ensures that each task is
+ /// processes but that no one task overtakes the system. This is kina like
+ /// cooperative multitasking.
+ /// </summary>
+ public class TaskRunnerFactory
+ {
+
+ private int maxIterationsPerRun;
+ private String name;
+ private ThreadPriority priority;
+ private bool daemon;
+
+ public TaskRunnerFactory()
+ {
+ initTaskRunnerFactory("ActiveMQ Task", ThreadPriority.Normal, true, 1000, false);
+ }
+
+ public TaskRunnerFactory(String name, ThreadPriority priority, bool daemon, int maxIterationsPerRun)
+ {
+ initTaskRunnerFactory(name, priority, daemon, maxIterationsPerRun, false);
+ }
+
+ public TaskRunnerFactory(String name, ThreadPriority priority, bool daemon, int maxIterationsPerRun, bool dedicatedTaskRunner)
+ {
+ initTaskRunnerFactory(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner);
+ }
+
+ public void initTaskRunnerFactory(String name, ThreadPriority priority, bool daemon, int maxIterationsPerRun, bool dedicatedTaskRunner)
+ {
+
+ this.name = name;
+ this.priority = priority;
+ this.daemon = daemon;
+ this.maxIterationsPerRun = maxIterationsPerRun;
+
+ // If your OS/JVM combination has a good thread model, you may want to avoid
+ // using a thread pool to run tasks and use a DedicatedTaskRunner instead.
+ }
+
+ public void shutdown()
+ {
+ }
+
+ public TaskRunner CreateTaskRunner(Task task, String name)
+ {
+ return new PooledTaskRunner(task, maxIterationsPerRun);
+ }
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.Transport.Failover
+{
+ class BackupTransport
+ {
+ private FailoverTransport failoverTransport;
+ private ITransport transport;
+ private Uri uri;
+ private bool disposed;
+
+ public BackupTransport(FailoverTransport ft)
+ {
+ this.failoverTransport = ft;
+ }
+
+ public void onCommand(ITransport t, Command c)
+ {
+ }
+
+ public void onException(ITransport t, Exception error)
+ {
+ this.disposed = true;
+ if(failoverTransport != null)
+ {
+ this.failoverTransport.Reconnect();
+ }
+ }
+
+ public ITransport Transport
+ {
+ get
+ {
+ return transport;
+ }
+ set
+ {
+ transport = value;
+ }
+ }
+
+ public Uri Uri
+ {
+ get
+ {
+ return uri;
+ }
+ set
+ {
+ uri = value;
+ }
+ }
+
+ public bool Disposed
+ {
+ get
+ {
+ return disposed || (transport != null && transport.IsDisposed);
+ }
+ set
+ {
+ disposed = value;
+ }
+ }
+
+ public int hashCode()
+ {
+ return uri != null ? uri.GetHashCode() : -1;
+ }
+
+ public bool equals(Object obj)
+ {
+ if(obj is BackupTransport)
+ {
+ BackupTransport other = obj as BackupTransport;
+ return uri == null && other.uri == null ||
+ (uri != null && other.uri != null && uri.Equals(other.uri));
+ }
+ return false;
+ }
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,1142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.State;
+using Apache.NMS.ActiveMQ.Threads;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Failover
+{
+ /// <summary>
+ /// A Transport that is made reliable by being able to fail over to another
+ /// transport when a transport failure is detected.
+ /// </summary>
+ public class FailoverTransport : ICompositeTransport, IComparable
+ {
+
+ private static int _idCounter = 0;
+ private int _id;
+
+ private bool disposed;
+ private bool connected;
+ private List<Uri> uris = new List<Uri>();
+ private CommandHandler _commandHandler;
+ private ExceptionHandler _exceptionHandler;
+
+ private Mutex reconnectMutex = new Mutex();
+ private Mutex backupMutex = new Mutex();
+ private Mutex sleepMutex = new Mutex();
+ private Mutex listenerMutex = new Mutex();
+ private ConnectionStateTracker stateTracker = new ConnectionStateTracker();
+ private Dictionary<int, Command> requestMap = new Dictionary<int, Command>();
+
+ private Uri connectedTransportURI;
+ private Uri failedConnectTransportURI;
+ private AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);
+ private TaskRunner reconnectTask = null;
+ private bool started;
+
+ private int _initialReconnectDelay = 10;
+ private int _maxReconnectDelay = 1000 * 30;
+ private int _backOffMultiplier = 2;
+ private bool _useExponentialBackOff = true;
+ private bool _randomize = true;
+ private bool initialized;
+ private int _maxReconnectAttempts;
+ private int connectFailures;
+ private int _reconnectDelay = 10;
+ private Exception connectionFailure;
+ private bool firstConnection = true;
+ //optionally always have a backup created
+ private bool _backup = false;
+ private List<BackupTransport> backups = new List<BackupTransport>();
+ private int _backupPoolSize = 1;
+ private bool _trackMessages = false;
+ private int _maxCacheSize = 256;
+ private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
+
+ public TimeSpan RequestTimeout
+ {
+ get
+ {
+ return requestTimeout;
+ }
+ set
+ {
+ requestTimeout = value;
+ }
+ }
+
+ private class FailoverTask : Task
+ {
+ private FailoverTransport parent;
+
+ public FailoverTask(FailoverTransport p)
+ {
+ parent = p;
+ }
+
+ public bool iterate()
+ {
+ bool result = false;
+ bool buildBackup = true;
+ bool doReconnect = !parent.disposed;
+ try
+ {
+ parent.backupMutex.WaitOne();
+ if(parent.ConnectedTransport == null && !parent.disposed)
+ {
+ result = parent.doReconnect();
+ buildBackup = false;
+ }
+ }
+ finally
+ {
+ parent.backupMutex.ReleaseMutex();
+ }
+ if(buildBackup)
+ {
+ parent.buildBackups();
+ }
+ else
+ {
+ //build backups on the next iteration
+ result = true;
+ try
+ {
+ parent.reconnectTask.wakeup();
+ }
+ catch(ThreadInterruptedException e)
+ {
+ e.GetType();
+ Tracer.Debug("Reconnect task has been interrupted.");
+ }
+ }
+ return result;
+ }
+ }
+
+ public FailoverTransport()
+ {
+ _id = _idCounter++;
+
+ stateTracker.TrackTransactions = true;
+ }
+
+ ~FailoverTransport()
+ {
+ Dispose(false);
+ }
+
+ public void onCommand(ITransport sender, Command command)
+ {
+ if(command != null)
+ {
+ if(command.IsResponse)
+ {
+ Object oo = null;
+ lock(requestMap)
+ {
+ int v = ((Response) command).CorrelationId;
+ try
+ {
+ oo = requestMap[v];
+ requestMap.Remove(v);
+ }
+ catch
+ {
+ }
+ }
+ Tracked t = oo as Tracked;
+ if(t != null)
+ {
+ t.onResponses();
+ }
+ }
+ if(!initialized)
+ {
+ if(command.IsBrokerInfo)
+ {
+ BrokerInfo info = (BrokerInfo) command;
+ BrokerInfo[] peers = info.PeerBrokerInfos;
+ if(peers != null)
+ {
+ for(int i = 0; i < peers.Length; i++)
+ {
+ String brokerString = peers[i].BrokerURL;
+ add(brokerString);
+ }
+ }
+ initialized = true;
+ }
+ }
+ }
+ this.Command(sender, command);
+ }
+
+ public void onException(ITransport sender, Exception error)
+ {
+ try
+ {
+ handleTransportFailure(error);
+ }
+ catch(Exception e)
+ {
+ e.GetType();
+ // What to do here?
+ }
+ }
+
+ public void disposedOnCommand(ITransport sender, Command c)
+ {
+ }
+
+ public void disposedOnException(ITransport sender, Exception e)
+ {
+ }
+
+ public void handleTransportFailure(Exception e)
+ {
+ ITransport transport = connectedTransport.GetAndSet(null);
+ if(transport != null)
+ {
+ transport.Command = new CommandHandler(disposedOnCommand);
+ transport.Exception = new ExceptionHandler(disposedOnException);
+ try
+ {
+ transport.Stop();
+ }
+ catch(Exception ex)
+ {
+ ex.GetType(); // Ignore errors but this lets us see the error during debugging
+ }
+
+ try
+ {
+ reconnectMutex.WaitOne();
+ bool reconnectOk = false;
+ if(started)
+ {
+ Tracer.Warn("Transport failed to " + ConnectedTransportURI + " , attempting to automatically reconnect due to: " + e.Message);
+ Tracer.Debug("Transport failed with the following exception:" + e.Message);
+ reconnectOk = true;
+ }
+
+ initialized = false;
+ failedConnectTransportURI = ConnectedTransportURI;
+ ConnectedTransportURI = null;
+ connected = false;
+ if(reconnectOk)
+ {
+ reconnectTask.wakeup();
+ }
+ }
+ finally
+ {
+ reconnectMutex.ReleaseMutex();
+ }
+ }
+ }
+
+ public void Start()
+ {
+ try
+ {
+ reconnectMutex.WaitOne();
+ Tracer.Debug("Started.");
+ if(started)
+ {
+ return;
+ }
+ started = true;
+ stateTracker.MaxCacheSize = MaxCacheSize;
+ stateTracker.TrackMessages = TrackMessages;
+ if(ConnectedTransport != null)
+ {
+ stateTracker.DoRestore(ConnectedTransport);
+ }
+ else
+ {
+ Reconnect();
+ }
+ }
+ finally
+ {
+ reconnectMutex.ReleaseMutex();
+ }
+ }
+
+ public virtual void Stop()
+ {
+ ITransport transportToStop = null;
+ try
+ {
+ reconnectMutex.WaitOne();
+ Tracer.Debug("Stopped.");
+ if(!started)
+ {
+ return;
+ }
+ started = false;
+ disposed = true;
+ connected = false;
+ foreach(BackupTransport t in backups)
+ {
+ t.Disposed = true;
+ }
+ backups.Clear();
+
+ if(ConnectedTransport != null)
+ {
+ transportToStop = connectedTransport.GetAndSet(null);
+ }
+ }
+ finally
+ {
+ reconnectMutex.ReleaseMutex();
+ }
+ try
+ {
+ sleepMutex.WaitOne();
+ }
+ finally
+ {
+ sleepMutex.ReleaseMutex();
+ }
+ reconnectTask.shutdown();
+ if(transportToStop != null)
+ {
+ transportToStop.Stop();
+ }
+ }
+
+ public int InitialReconnectDelay
+ {
+ get
+ {
+ return _initialReconnectDelay;
+ }
+ set
+ {
+ _initialReconnectDelay = value;
+ }
+ }
+
+ public int MaxReconnectDelay
+ {
+ get
+ {
+ return _maxReconnectDelay;
+ }
+ set
+ {
+ _maxReconnectDelay = value;
+ }
+ }
+
+ public int ReconnectDelay
+ {
+ get
+ {
+ return _reconnectDelay;
+ }
+ set
+ {
+ _reconnectDelay = value;
+ }
+ }
+
+ public int ReconnectDelayExponent
+ {
+ get
+ {
+ return _backOffMultiplier;
+ }
+ set
+ {
+ _backOffMultiplier = value;
+ }
+ }
+
+ public ITransport ConnectedTransport
+ {
+ get
+ {
+ return connectedTransport.Value;
+ }
+ set
+ {
+ connectedTransport.Value = value;
+ }
+ }
+
+ public Uri ConnectedTransportURI
+ {
+ get
+ {
+ return connectedTransportURI;
+ }
+ set
+ {
+ connectedTransportURI = value;
+ }
+ }
+
+ public int MaxReconnectAttempts
+ {
+ get
+ {
+ return _maxReconnectAttempts;
+ }
+ set
+ {
+ _maxReconnectAttempts = value;
+ }
+ }
+
+ public bool Randomize
+ {
+ get
+ {
+ return _randomize;
+ }
+ set
+ {
+ _randomize = value;
+ }
+ }
+
+ public bool Backup
+ {
+ get
+ {
+ return _backup;
+ }
+ set
+ {
+ _backup = value;
+ }
+ }
+
+ public int BackupPoolSize
+ {
+ get
+ {
+ return _backupPoolSize;
+ }
+ set
+ {
+ _backupPoolSize = value;
+ }
+ }
+
+ public bool TrackMessages
+ {
+ get
+ {
+ return _trackMessages;
+ }
+ set
+ {
+ _trackMessages = value;
+ }
+ }
+
+ public int MaxCacheSize
+ {
+ get
+ {
+ return _maxCacheSize;
+ }
+ set
+ {
+ _maxCacheSize = value;
+ }
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <param name="command"></param>
+ /// <returns>Returns true if the command is one sent when a connection is being closed.</returns>
+ private bool IsShutdownCommand(Command command)
+ {
+ return (command != null && (command.IsShutdownInfo || command is RemoveInfo));
+ }
+
+ public void Oneway(Command command)
+ {
+ Exception error = null;
+ try
+ {
+
+ try
+ {
+ reconnectMutex.WaitOne();
+
+ if(IsShutdownCommand(command) && ConnectedTransport == null)
+ {
+ if(command.IsShutdownInfo)
+ {
+ // Skipping send of ShutdownInfo command when not connected.
+ return;
+ }
+ if(command is RemoveInfo)
+ {
+ // Simulate response to RemoveInfo command
+ Response response = new Response();
+ response.CorrelationId = command.CommandId;
+ onCommand(this, response);
+ return;
+ }
+ }
+ // Keep trying until the message is sent.
+ for(int i = 0; !disposed; i++)
+ {
+ try
+ {
+ // Wait for transport to be connected.
+ ITransport transport = ConnectedTransport;
+ while(transport == null && !disposed
+ && connectionFailure == null
+ // && !Thread.CurrentThread.isInterrupted()
+ )
+ {
+ Tracer.Info("Waiting for transport to reconnect.");
+ try
+ {
+ // Release so that the reconnect task can run
+ reconnectMutex.ReleaseMutex();
+ try
+ {
+ // Wait for something
+ Thread.Sleep(1000);
+ }
+ catch(ThreadInterruptedException e)
+ {
+ Thread.CurrentThread.Interrupt(); // KILROY - not needed
+ Tracer.Debug("Interupted: " + e);
+ }
+ }
+ finally
+ {
+ reconnectMutex.WaitOne();
+ }
+ transport = ConnectedTransport;
+ }
+
+ if(transport == null)
+ {
+ // Previous loop may have exited due to use being
+ // disposed.
+ if(disposed)
+ {
+ error = new IOException("Transport disposed.");
+ }
+ else if(connectionFailure != null)
+ {
+ error = connectionFailure;
+ }
+ else
+ {
+ error = new IOException("Unexpected failure.");
+ }
+ break;
+ }
+
+ // If it was a request and it was not being tracked by
+ // the state tracker,
+ // then hold it in the requestMap so that we can replay
+ // it later.
+ Tracked tracked = stateTracker.track(command);
+ lock(requestMap)
+ {
+ if(tracked != null && tracked.WaitingForResponse)
+ {
+ requestMap.Add(command.CommandId, tracked);
+ }
+ else if(tracked == null && command.ResponseRequired)
+ {
+ requestMap.Add(command.CommandId, command);
+ }
+ }
+
+ // Send the message.
+ try
+ {
+ transport.Oneway(command);
+ stateTracker.trackBack(command);
+ }
+ catch(Exception e)
+ {
+
+ // If the command was not tracked.. we will retry in
+ // this method
+ if(tracked == null)
+ {
+
+ // since we will retry in this method.. take it
+ // out of the request
+ // map so that it is not sent 2 times on
+ // recovery
+ if(command.ResponseRequired)
+ {
+ requestMap.Remove(command.CommandId);
+ }
+
+ // Rethrow the exception so it will handled by
+ // the outer catch
+ throw e;
+ }
+
+ }
+
+ return;
+
+ }
+ catch(Exception e)
+ {
+ Tracer.Debug("Send Oneway attempt: " + i + " failed.");
+ handleTransportFailure(e);
+ }
+ }
+ }
+ finally
+ {
+ reconnectMutex.ReleaseMutex();
+ }
+ }
+ catch(ThreadInterruptedException e)
+ {
+ e.GetType();
+ // Some one may be trying to stop our thread.
+ Thread.CurrentThread.Interrupt();
+ throw new ThreadInterruptedException();
+ }
+ if(!disposed)
+ {
+ if(error != null)
+ {
+ throw error;
+ }
+ }
+ }
+
+ /*
+ public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) {
+ throw new AssertionError("Unsupported Method");
+ }
+ */
+
+ public Object Request(Object command)
+ {
+ throw new ApplicationException("FailoverTransport does not support Request(Object)");
+ }
+
+ public Object Request(Object command, int timeout)
+ {
+ throw new ApplicationException("FailoverTransport does not support Request(Object, Int)");
+ }
+
+ public void add(Uri[] u)
+ {
+ lock(uris)
+ {
+ for(int i = 0; i < u.Length; i++)
+ {
+ if(!uris.Contains(u[i]))
+ {
+ uris.Add(u[i]);
+ }
+ }
+ }
+ Reconnect();
+ }
+
+ public void remove(Uri[] u)
+ {
+ lock(uris)
+ {
+ for(int i = 0; i < u.Length; i++)
+ {
+ uris.Remove(u[i]);
+ }
+ }
+ Reconnect();
+ }
+
+ public void add(String u)
+ {
+ try
+ {
+ Uri uri = new Uri(u);
+ lock(uris)
+ {
+ if(!uris.Contains(uri))
+ {
+ uris.Add(uri);
+ }
+ }
+
+ Reconnect();
+ }
+ catch(Exception e)
+ {
+ Tracer.Error("Failed to parse URI: " + u + " because " + e.Message);
+ }
+ }
+
+ public void Reconnect()
+ {
+ try
+ {
+ reconnectMutex.WaitOne();
+
+ if(started)
+ {
+ if(reconnectTask == null)
+ {
+ Tracer.Debug("Creating reconnect task");
+ reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(new FailoverTask(this),
+ "ActiveMQ Failover Worker: " + this.GetHashCode().ToString());
+ }
+
+ Tracer.Debug("Waking up reconnect task");
+ try
+ {
+ reconnectTask.wakeup();
+ }
+ catch(ThreadInterruptedException e)
+ {
+ e.GetType(); // Suppress warning
+ Thread.CurrentThread.Interrupt();
+ }
+ }
+ else
+ {
+ Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
+ }
+ }
+ finally
+ {
+ reconnectMutex.ReleaseMutex();
+ }
+ }
+
+ private List<Uri> ConnectList
+ {
+ get
+ {
+ List<Uri> l = new List<Uri>(uris);
+ bool removed = false;
+ if(failedConnectTransportURI != null)
+ {
+ removed = l.Remove(failedConnectTransportURI);
+ }
+ if(Randomize)
+ {
+ // Randomly, reorder the list by random swapping
+ Random r = new Random(DateTime.Now.Millisecond);
+ for(int i = 0; i < l.Count; i++)
+ {
+ int p = r.Next(l.Count);
+ Uri t = l[p];
+ l[p] = l[i];
+ l[i] = t;
+ }
+ }
+ if(removed)
+ {
+ l.Add(failedConnectTransportURI);
+ }
+ return l;
+ }
+ }
+
+ protected void restoreTransport(ITransport t)
+ {
+ t.Start();
+ //send information to the broker - informing it we are an ft client
+ ConnectionControl cc = new ConnectionControl();
+ cc.FaultTolerant = true;
+ t.Oneway(cc);
+ stateTracker.DoRestore(t);
+ Dictionary<int, Command> tmpMap = null;
+ lock(requestMap)
+ {
+ tmpMap = new Dictionary<int, Command>(requestMap);
+ }
+ foreach(Command command in tmpMap.Values)
+ {
+ t.Oneway(command);
+ }
+ }
+
+ public bool UseExponentialBackOff
+ {
+ get
+ {
+ return _useExponentialBackOff;
+ }
+ set
+ {
+ _useExponentialBackOff = value;
+ }
+ }
+
+ public override String ToString()
+ {
+ return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString();
+ }
+
+ public String RemoteAddress
+ {
+ get
+ {
+ FailoverTransport transport = ConnectedTransport as FailoverTransport;
+ if(transport != null)
+ {
+ return transport.RemoteAddress;
+ }
+ return null;
+ }
+ }
+
+ public bool IsFaultTolerant
+ {
+ get
+ {
+ return true;
+ }
+ }
+
+ bool doReconnect()
+ {
+ Exception failure = null;
+ try
+ {
+ reconnectMutex.WaitOne();
+
+ if(disposed || connectionFailure != null)
+ {
+ }
+
+ if(ConnectedTransport != null || disposed || connectionFailure != null)
+ {
+ return false;
+ }
+ else
+ {
+ List<Uri> connectList = ConnectList;
+ if(connectList.Count == 0)
+ {
+ failure = new IOException("No uris available to connect to.");
+ }
+ else
+ {
+ if(!UseExponentialBackOff)
+ {
+ ReconnectDelay = InitialReconnectDelay;
+ }
+ try
+ {
+ backupMutex.WaitOne();
+ if(Backup && backups.Count != 0)
+ {
+ BackupTransport bt = backups[0];
+ backups.RemoveAt(0);
+ ITransport t = bt.Transport;
+ Uri uri = bt.Uri;
+ t.Command = new CommandHandler(onCommand);
+ t.Exception = new ExceptionHandler(onException);
+ try
+ {
+ if(started)
+ {
+ restoreTransport(t);
+ }
+ ReconnectDelay = InitialReconnectDelay;
+ failedConnectTransportURI = null;
+ ConnectedTransportURI = uri;
+ ConnectedTransport = t;
+ connectFailures = 0;
+ Tracer.Info("Successfully reconnected to backup " + uri);
+ return false;
+ }
+ catch(Exception e)
+ {
+ e.GetType();
+ Tracer.Debug("Backup transport failed");
+ }
+ }
+ }
+ finally
+ {
+ backupMutex.ReleaseMutex();
+ }
+
+ foreach(Uri uri in connectList)
+ {
+ if(ConnectedTransport != null || disposed)
+ {
+ break;
+ }
+
+ try
+ {
+ Tracer.Debug("Attempting connect to: " + uri);
+ ITransport t = TransportFactory.CompositeConnect(uri);
+ t.Command = new CommandHandler(onCommand);
+ t.Exception = new ExceptionHandler(onException);
+ t.Start();
+
+ if(started)
+ {
+ restoreTransport(t);
+ }
+
+ Tracer.Debug("Connection established");
+ ReconnectDelay = InitialReconnectDelay;
+ ConnectedTransportURI = uri;
+ ConnectedTransport = t;
+ connectFailures = 0;
+
+ if(firstConnection)
+ {
+ firstConnection = false;
+ Tracer.Info("Successfully connected to " + uri);
+ }
+ else
+ {
+ Tracer.Info("Successfully reconnected to " + uri);
+ }
+ connected = true;
+ return false;
+ }
+ catch(Exception e)
+ {
+ failure = e;
+ Tracer.Debug("Connect fail to: " + uri + ", reason: " + e);
+ }
+ }
+ }
+ }
+
+ if(MaxReconnectAttempts > 0 && ++connectFailures >= MaxReconnectAttempts)
+ {
+ Tracer.Error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
+ connectionFailure = failure;
+
+ onException(this, connectionFailure);
+
+ return false;
+ }
+ }
+ finally
+ {
+ reconnectMutex.ReleaseMutex();
+ }
+ if(!disposed)
+ {
+
+ Tracer.Debug("Waiting " + ReconnectDelay + " ms before attempting connection. ");
+ try
+ {
+ sleepMutex.WaitOne();
+ try
+ {
+ Thread.Sleep(ReconnectDelay);
+ }
+ catch(ThreadInterruptedException e)
+ {
+ e.GetType();
+ Thread.CurrentThread.Interrupt();
+ }
+ }
+ finally
+ {
+ sleepMutex.ReleaseMutex();
+ }
+
+ if(UseExponentialBackOff)
+ {
+ // Exponential increment of reconnect delay.
+ ReconnectDelay *= ReconnectDelayExponent;
+ if(ReconnectDelay > MaxReconnectDelay)
+ {
+ ReconnectDelay = MaxReconnectDelay;
+ }
+ }
+ }
+ return !disposed;
+ }
+
+
+ bool buildBackups()
+ {
+ try
+ {
+ backupMutex.WaitOne();
+ if(!disposed && Backup && backups.Count < BackupPoolSize)
+ {
+ List<Uri> connectList = ConnectList;
+ foreach(BackupTransport bt in backups)
+ {
+ if(bt.Disposed)
+ {
+ backups.Remove(bt);
+ }
+ }
+ foreach(Uri uri in connectList)
+ {
+ if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))
+ {
+ try
+ {
+ BackupTransport bt = new BackupTransport(this);
+ bt.Uri = uri;
+ if(!backups.Contains(bt))
+ {
+ ITransport t = TransportFactory.CompositeConnect(uri);
+ t.Command = new CommandHandler(bt.onCommand);
+ t.Exception = new ExceptionHandler(bt.onException);
+ t.Start();
+ bt.Transport = t;
+ backups.Add(bt);
+ }
+ }
+ catch(Exception e)
+ {
+ e.GetType();
+ Tracer.Debug("Failed to build backup ");
+ }
+ }
+ if(backups.Count < BackupPoolSize)
+ {
+ break;
+ }
+ }
+ }
+ }
+ finally
+ {
+ backupMutex.ReleaseMutex();
+ }
+ return false;
+ }
+
+ public bool IsDisposed
+ {
+ get
+ {
+ return disposed;
+ }
+ }
+
+ public bool Connected
+ {
+ get
+ {
+ return connected;
+ }
+ }
+
+ public void Reconnect(Uri uri)
+ {
+ add(new Uri[] { uri });
+ }
+
+ public FutureResponse AsyncRequest(Command command)
+ {
+ throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");
+ }
+
+ public Response Request(Command command)
+ {
+ throw new ApplicationException("FailoverTransport does not implement Request(Command)");
+ }
+
+ public Response Request(Command command, TimeSpan ts)
+ {
+ throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");
+ }
+
+ public CommandHandler Command
+ {
+ get
+ {
+ return _commandHandler;
+ }
+ set
+ {
+ _commandHandler = value;
+ }
+ }
+
+ public ExceptionHandler Exception
+ {
+ get
+ {
+ return _exceptionHandler;
+ }
+ set
+ {
+ _exceptionHandler = value;
+ }
+ }
+
+ public bool IsStarted
+ {
+ get
+ {
+ return started;
+ }
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ public void Dispose(bool disposing)
+ {
+ if(disposing)
+ {
+ // get rid of unmanaged stuff
+ }
+ disposed = true;
+ }
+
+ public int CompareTo(Object o)
+ {
+ if(o is FailoverTransport)
+ {
+ FailoverTransport oo = o as FailoverTransport;
+
+ return this._id - oo._id;
+ }
+ else
+ {
+ throw new ArgumentException();
+ }
+ }
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Failover
+{
+ public class FailoverTransportFactory : ITransportFactory
+ {
+ private ITransport wrapTransport(ITransport transport)
+ {
+ transport = new MutexTransport(transport);
+ transport = new ResponseCorrelator(transport);
+ return transport;
+ }
+
+ private ITransport doConnect(Uri location)
+ {
+ ITransport transport = CreateTransport(URISupport.parseComposite(location));
+ return wrapTransport(transport);
+ }
+
+ public ITransport CompositeConnect(Uri location)
+ {
+ return CreateTransport(URISupport.parseComposite(location));
+ }
+
+ public ITransport CreateTransport(Uri location)
+ {
+ return doConnect(location);
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <param name="compositData"></param>
+ /// <returns></returns>
+ public ITransport CreateTransport(URISupport.CompositeData compositData)
+ {
+ StringDictionary options = compositData.Parameters;
+ FailoverTransport transport = CreateTransport(options);
+ transport.add(compositData.Components);
+ return transport;
+ }
+
+ public FailoverTransport CreateTransport(StringDictionary parameters)
+ {
+ FailoverTransport transport = new FailoverTransport();
+ URISupport.SetProperties(transport, parameters, "");
+ return transport;
+ }
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.ActiveMQ.Transport
+{
+ public interface ICompositeTransport : ITransport
+ {
+ void add(Uri[] uris);
+ void remove(Uri[] uris);
+ }
+}
+
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs Fri Oct 24 14:10:22 2008
@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS;
+
using System;
+using Apache.NMS.ActiveMQ.Commands;
namespace Apache.NMS.ActiveMQ.Transport
{
@@ -26,11 +26,11 @@
/// <summary>
/// Represents the logical networking transport layer.
/// </summary>
- public interface ITransport : IStartable, IDisposable
- {
- void Oneway(Command command);
-
- FutureResponse AsyncRequest(Command command);
+ public interface ITransport : IStartable, IDisposable, IStoppable
+ {
+ void Oneway(Command command);
+
+ FutureResponse AsyncRequest(Command command);
TimeSpan RequestTimeout
{
@@ -38,20 +38,25 @@
set;
}
- Response Request(Command command);
+ Response Request(Command command);
Response Request(Command command, TimeSpan timeout);
-
- CommandHandler Command
+
+ CommandHandler Command
+ {
+ get;
+ set;
+ }
+
+ ExceptionHandler Exception
{
- get;
- set;
- }
-
- ExceptionHandler Exception
+ get;
+ set;
+ }
+
+ bool IsDisposed
{
- get;
- set;
- }
- }
+ get;
+ }
+ }
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransportFactory.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransportFactory.cs Fri Oct 24 14:10:22 2008
@@ -20,7 +20,8 @@
namespace Apache.NMS.ActiveMQ.Transport
{
public interface ITransportFactory
- {
+ {
ITransport CreateTransport(Uri location);
+ ITransport CompositeConnect(Uri location);
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Fri Oct 24 14:10:22 2008
@@ -14,14 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.ActiveMQ.OpenWire;
-using Apache.NMS.ActiveMQ.Transport;
-using Apache.NMS.Util;
+
using System;
using System.IO;
using System.Net.Sockets;
using System.Threading;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.OpenWire;
+using Apache.NMS.Util;
namespace Apache.NMS.ActiveMQ.Transport.Tcp
{
@@ -37,6 +37,7 @@
private BinaryWriter socketWriter;
private Thread readThread;
private bool started;
+ private bool disposed = false;
private AtomicBoolean closed = new AtomicBoolean(false);
private volatile bool seenShutdown;
private TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
@@ -63,15 +64,15 @@
{
lock(myLock)
{
- if (!started)
+ if(!started)
{
- if (null == commandHandler)
+ if(null == commandHandler)
{
throw new InvalidOperationException(
"command cannot be null when Start is called.");
}
- if (null == exceptionHandler)
+ if(null == exceptionHandler)
{
throw new InvalidOperationException(
"exception cannot be null when Start is called.");
@@ -179,6 +180,11 @@
throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls");
}
+ public void Stop()
+ {
+ Close();
+ }
+
public void Close()
{
if(closed.CompareAndSet(false, true))
@@ -235,9 +241,9 @@
{
if(Thread.CurrentThread != readThread
#if !NETCF
- && readThread.IsAlive
+ && readThread.IsAlive
#endif
- )
+)
{
TimeSpan waitTime;
@@ -273,6 +279,15 @@
protected void Dispose(bool disposing)
{
Close();
+ disposed = true;
+ }
+
+ public bool IsDisposed
+ {
+ get
+ {
+ return disposed;
+ }
}
public void ReadLoop()
@@ -337,7 +352,7 @@
set { this.commandHandler = value; }
}
- public ExceptionHandler Exception
+ public ExceptionHandler Exception
{
get { return exceptionHandler; }
set { this.exceptionHandler = value; }
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Fri Oct 24 14:10:22 2008
@@ -16,14 +16,12 @@
*/
using System;
+using System.Collections.Specialized;
using System.Net;
using System.Net.Sockets;
-using System.Collections.Specialized;
using Apache.NMS.ActiveMQ.OpenWire;
using Apache.NMS.ActiveMQ.Transport.Stomp;
-using Apache.NMS;
using Apache.NMS.Util;
-using System.Threading;
namespace Apache.NMS.ActiveMQ.Transport.Tcp
{
@@ -103,7 +101,7 @@
#region ITransportFactory Members
- public ITransport CreateTransport(Uri location)
+ public ITransport CompositeConnect(Uri location)
{
// Extract query parameters from broker Uri
StringDictionary map = URISupport.ParseQuery(location.Query);
@@ -136,6 +134,15 @@
transport = new WireFormatNegotiator(transport, (OpenWireFormat) wireformat);
}
+ transport.RequestTimeout = this.requestTimeout;
+
+ return transport;
+ }
+
+ public ITransport CreateTransport(Uri location)
+ {
+ ITransport transport = CompositeConnect(location);
+
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
transport.RequestTimeout = this.requestTimeout;
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+
+using Apache.NMS.ActiveMQ.Transport.Failover;
+using Apache.NMS.ActiveMQ.Transport.Tcp;
+
+namespace Apache.NMS.ActiveMQ.Transport
+{
+ public class TransportFactory
+ {
+
+ private static Dictionary<String, ITransportFactory> TRANSPORT_FACTORYS = new Dictionary<String, ITransportFactory>();
+
+ static TransportFactory()
+ {
+ TRANSPORT_FACTORYS.Add("tcp", new TcpTransportFactory());
+ TRANSPORT_FACTORYS.Add("failover", new FailoverTransportFactory());
+ }
+
+ /// <summary>
+ /// Creates a normal transport.
+ /// </summary>
+ /// <param name="location"></param>
+ /// <returns>the transport</returns>
+ public static ITransport CreateTransport(Uri location)
+ {
+ ITransportFactory tf = findTransportFactory(location);
+ return tf.CreateTransport(location);
+ }
+
+ public static ITransport CompositeConnect(Uri location)
+ {
+ ITransportFactory tf = findTransportFactory(location);
+ return tf.CompositeConnect(location);
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <param name="location"></param>
+ /// <returns></returns>
+ private static ITransportFactory findTransportFactory(Uri location)
+ {
+ String scheme = location.Scheme;
+ if(scheme == null)
+ {
+ throw new IOException("Transport not scheme specified: [" + location + "]");
+ }
+ ITransportFactory tf = TRANSPORT_FACTORYS[scheme];
+ if(tf == null)
+ {
+ throw new ApplicationException("Transport Factory for " + scheme + " does not exist.");
+ }
+ return tf;
+ }
+ }
+}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs Fri Oct 24 14:10:22 2008
@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.ActiveMQ.Transport;
+
using System;
+using Apache.NMS.ActiveMQ.Commands;
namespace Apache.NMS.ActiveMQ.Transport
{
@@ -28,6 +28,7 @@
protected readonly ITransport next;
protected CommandHandler commandHandler;
protected ExceptionHandler exceptionHandler;
+ private bool disposed = false;
public TransportFilter(ITransport next)
{
@@ -107,12 +108,12 @@
{
if(commandHandler == null)
{
- throw new InvalidOperationException ("command cannot be null when Start is called.");
+ throw new InvalidOperationException("command cannot be null when Start is called.");
}
if(exceptionHandler == null)
{
- throw new InvalidOperationException ("exception cannot be null when Start is called.");
+ throw new InvalidOperationException("exception cannot be null when Start is called.");
}
this.next.Start();
@@ -141,6 +142,15 @@
{
this.next.Dispose();
}
+ disposed = true;
+ }
+
+ public bool IsDisposed
+ {
+ get
+ {
+ return disposed;
+ }
}
public CommandHandler Command
@@ -149,12 +159,15 @@
set { this.commandHandler = value; }
}
- public ExceptionHandler Exception
+ public ExceptionHandler Exception
{
get { return exceptionHandler; }
set { this.exceptionHandler = value; }
}
+ public virtual void Stop()
+ {
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj Fri Oct 24 14:10:22 2008
@@ -237,6 +237,7 @@
<Compile Include="src\main\csharp\Commands\PartialCommand.cs">
<SubType>Code</SubType>
</Compile>
+ <Compile Include="src\main\csharp\Commands\ProducerAck.cs" />
<Compile Include="src\main\csharp\Commands\ProducerId.cs">
<SubType>Code</SubType>
</Compile>
@@ -303,6 +304,7 @@
<SubType>Code</SubType>
</Compile>
<Compile Include="src\main\csharp\DispatchingThread.cs" />
+ <Compile Include="src\main\csharp\IOException.cs" />
<Compile Include="src\main\csharp\ISynchronization.cs">
<SubType>Code</SubType>
</Compile>
@@ -700,12 +702,32 @@
<Compile Include="src\main\csharp\Session.cs">
<SubType>Code</SubType>
</Compile>
+ <Compile Include="src\main\csharp\State\CommandVisitorAdapter.cs" />
+ <Compile Include="src\main\csharp\State\ConnectionState.cs" />
+ <Compile Include="src\main\csharp\State\ConnectionStateTracker.cs" />
+ <Compile Include="src\main\csharp\State\ConsumerState.cs" />
+ <Compile Include="src\main\csharp\State\ICommandVisitor.cs" />
+ <Compile Include="src\main\csharp\State\ProducerState.cs" />
+ <Compile Include="src\main\csharp\State\SessionState.cs" />
+ <Compile Include="src\main\csharp\State\SynchronizedObjects.cs" />
+ <Compile Include="src\main\csharp\State\ThreadSimulator.cs" />
+ <Compile Include="src\main\csharp\State\Tracked.cs" />
+ <Compile Include="src\main\csharp\State\TransactionState.cs" />
+ <Compile Include="src\main\csharp\Threads\DefaultThreadPools.cs" />
+ <Compile Include="src\main\csharp\Threads\PooledTaskRunner.cs" />
+ <Compile Include="src\main\csharp\Threads\Task.cs" />
+ <Compile Include="src\main\csharp\Threads\TaskRunner.cs" />
+ <Compile Include="src\main\csharp\Threads\TaskRunnerFactory.cs" />
<Compile Include="src\main\csharp\TransactionContext.cs">
<SubType>Code</SubType>
</Compile>
+ <Compile Include="src\main\csharp\Transport\Failover\BackupTransport.cs" />
+ <Compile Include="src\main\csharp\Transport\Failover\FailoverTransport.cs" />
+ <Compile Include="src\main\csharp\Transport\Failover\FailoverTransportFactory.cs" />
<Compile Include="src\main\csharp\Transport\FutureResponse.cs">
<SubType>Code</SubType>
</Compile>
+ <Compile Include="src\main\csharp\Transport\ICompositeTransport.cs" />
<Compile Include="src\main\csharp\Transport\ITransport.cs">
<SubType>Code</SubType>
</Compile>
@@ -731,6 +753,7 @@
<Compile Include="src\main\csharp\Transport\Tcp\TcpTransportFactory.cs">
<SubType>Code</SubType>
</Compile>
+ <Compile Include="src\main\csharp\Transport\TransportFactory.cs" />
<Compile Include="src\main\csharp\Transport\TransportFilter.cs">
<SubType>Code</SubType>
</Compile>