You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2008/10/24 23:10:23 UTC
svn commit: r707747 [3/4] - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk: ./ src/main/csharp/
src/main/csharp/Commands/ src/main/csharp/State/ src/main/csharp/Threads/
src/main/csharp/Transport/ src/main/csharp/Transport/Failover/ src/main/cs...
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/Response.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/Response.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/Response.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/Response.cs Fri Oct 24 14:10:22 2008
@@ -20,47 +20,54 @@
// activemq-core module
//
-using System;
-using System.Collections;
-using Apache.NMS.ActiveMQ.OpenWire;
-using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.State;
namespace Apache.NMS.ActiveMQ.Commands
{
- /// <summary>
- /// The ActiveMQ Response Command
- /// </summary>
- public class Response : BaseCommand
- {
- public const byte ID_Response = 30;
-
- int correlationId;
-
- public override string ToString() {
- return GetType().Name + "["
- + " CorrelationId=" + CorrelationId
- + " ]";
+ /// <summary>
+ /// The ActiveMQ Response Command
+ /// </summary>
+ public class Response : BaseCommand
+ {
+ public const byte ID_Response = 30;
+
+ int correlationId;
+
+ public override string ToString()
+ {
+ return GetType().Name + "["
+ + " CorrelationId=" + CorrelationId
+ + " ]";
}
- public override byte GetDataStructureType() {
- return ID_Response;
- }
+ public override byte GetDataStructureType()
+ {
+ return ID_Response;
+ }
+
+ // Properties
+
+ public int CorrelationId
+ {
+ get { return correlationId; }
+ set { this.correlationId = value; }
+ }
- // Properties
-
- public int CorrelationId
- {
- get { return correlationId; }
- set { this.correlationId = value; }
- }
-
- public override bool IsResponse {
- get { return true; }
+ public override bool IsResponse
+ {
+ get
+ {
+ return true;
+ }
}
-
-
- }
+
+ public override Response visit(ICommandVisitor visitor)
+ {
+ return null;
+ }
+
+ }
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/SessionId.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/SessionId.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/SessionId.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/SessionId.cs Fri Oct 24 14:10:22 2008
@@ -20,72 +20,113 @@
// activemq-core module
//
-using System;
-using System.Collections;
-using Apache.NMS.ActiveMQ.OpenWire;
-using Apache.NMS.ActiveMQ.Commands;
namespace Apache.NMS.ActiveMQ.Commands
{
- /// <summary>
- /// The ActiveMQ SessionId Command
- /// </summary>
- public class SessionId : BaseDataStructure, DataStructure
- {
- public const byte ID_SessionId = 121;
-
- string connectionId;
- long value;
-
- public override int GetHashCode() {
- int answer = 0;
- answer = (answer * 37) + HashCode(ConnectionId);
- answer = (answer * 37) + HashCode(Value);
- return answer;
-
+ /// <summary>
+ /// The ActiveMQ SessionId Command
+ /// </summary>
+ public class SessionId : BaseDataStructure, DataStructure
+ {
+ public const byte ID_SessionId = 121;
+
+ string connectionId;
+ long value;
+ ConnectionId parentId;
+
+ public override int GetHashCode()
+ {
+ int answer = 0;
+ answer = (answer * 37) + HashCode(ConnectionId);
+ answer = (answer * 37) + HashCode(Value);
+ return answer;
}
- public override bool Equals(object that) {
- if (that is SessionId) {
- return Equals((SessionId) that);
+ public override bool Equals(object that)
+ {
+ if(that is SessionId)
+ {
+ return Equals((SessionId) that);
}
return false;
- }
+ }
- public virtual bool Equals(SessionId that) {
- if (! Equals(this.ConnectionId, that.ConnectionId)) return false;
- if (! Equals(this.Value, that.Value)) return false;
- return true;
+ public virtual bool Equals(SessionId that)
+ {
+ if(!Equals(this.ConnectionId, that.ConnectionId))
+ return false;
+ if(!Equals(this.Value, that.Value))
+ return false;
+ return true;
+ }
+ public override string ToString()
+ {
+ return GetType().Name + "["
+ + " ConnectionId=" + ConnectionId
+ + " Value=" + Value
+ + " ]";
}
- public override string ToString() {
- return GetType().Name + "["
- + " ConnectionId=" + ConnectionId
- + " Value=" + Value
- + " ]";
+ public override byte GetDataStructureType()
+ {
+ return ID_SessionId;
+ }
+
+ // Properties
+
+ public string ConnectionId
+ {
+ get { return connectionId; }
+ set { this.connectionId = value; }
+ }
+ public long Value
+ {
+ get { return value; }
+ set { this.value = value; }
}
- public override byte GetDataStructureType() {
- return ID_SessionId;
- }
+ public ConnectionId ParentId
+ {
+ get
+ {
+ if(parentId == null)
+ {
+ parentId = new ConnectionId(this);
+ }
+ return parentId;
+ }
+ }
+ public SessionId()
+ : base()
+ {
+ }
- // Properties
+ public SessionId(ConnectionId connectionId, long sessionId)
+ {
+ this.connectionId = connectionId.Value;
+ this.value = sessionId;
+ }
- public string ConnectionId
- {
- get { return connectionId; }
- set { this.connectionId = value; }
- }
+ public SessionId(SessionId id)
+ {
+ this.connectionId = id.ConnectionId;
+ this.value = id.Value;
+ }
- public long Value
- {
- get { return value; }
- set { this.value = value; }
- }
+ public SessionId(ProducerId id)
+ {
+ this.connectionId = id.ConnectionId;
+ this.value = id.Value;
+ }
- }
+ public SessionId(ConsumerId id)
+ {
+ this.connectionId = id.ConnectionId;
+ this.value = id.SessionId;
+ }
+ }
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/SessionInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/SessionInfo.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/SessionInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/SessionInfo.cs Fri Oct 24 14:10:22 2008
@@ -20,42 +20,55 @@
// activemq-core module
//
-using System;
-using System.Collections;
-using Apache.NMS.ActiveMQ.OpenWire;
-using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.State;
namespace Apache.NMS.ActiveMQ.Commands
{
- /// <summary>
- /// The ActiveMQ SessionInfo Command
- /// </summary>
- public class SessionInfo : BaseCommand
- {
- public const byte ID_SessionInfo = 4;
-
- SessionId sessionId;
+ /// <summary>
+ /// The ActiveMQ SessionInfo Command
+ /// </summary>
+ public class SessionInfo : BaseCommand
+ {
+ public const byte ID_SessionInfo = 4;
+
+ SessionId sessionId;
+
+ public override string ToString()
+ {
+ return GetType().Name + "["
+ + " SessionId=" + SessionId
+ + " ]";
- public override string ToString() {
- return GetType().Name + "["
- + " SessionId=" + SessionId
- + " ]";
+ }
+ public override byte GetDataStructureType()
+ {
+ return ID_SessionInfo;
}
- public override byte GetDataStructureType() {
- return ID_SessionInfo;
- }
+ // Properties
+
+ public SessionId SessionId
+ {
+ get { return sessionId; }
+ set { this.sessionId = value; }
+ }
- // Properties
+ public SessionInfo(ConnectionInfo connectionInfo, long sessionId)
+ {
+ this.sessionId = new SessionId(connectionInfo.ConnectionId, sessionId);
+ }
- public SessionId SessionId
- {
- get { return sessionId; }
- set { this.sessionId = value; }
- }
+ public SessionInfo()
+ : base()
+ {
+ }
- }
+ public override Response visit(ICommandVisitor visitor)
+ {
+ return visitor.processAddSession(this);
+ }
+ }
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ShutdownInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ShutdownInfo.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ShutdownInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ShutdownInfo.cs Fri Oct 24 14:10:22 2008
@@ -20,34 +20,46 @@
// activemq-core module
//
-using System;
-using System.Collections;
-using Apache.NMS.ActiveMQ.OpenWire;
-using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.State;
namespace Apache.NMS.ActiveMQ.Commands
{
- /// <summary>
- /// The ActiveMQ ShutdownInfo Command
- /// </summary>
- public class ShutdownInfo : BaseCommand
- {
- public const byte ID_ShutdownInfo = 11;
-
-
- public override string ToString() {
- return GetType().Name + "["
- + " ]";
+ /// <summary>
+ /// The ActiveMQ ShutdownInfo Command
+ /// </summary>
+ public class ShutdownInfo : BaseCommand
+ {
+ public const byte ID_ShutdownInfo = 11;
+
+ public override string ToString()
+ {
+ return GetType().Name + "["
+ + " ]";
+
+ }
+
+ public override byte GetDataStructureType()
+ {
+ return ID_ShutdownInfo;
}
- public override byte GetDataStructureType() {
- return ID_ShutdownInfo;
- }
+ // Properties
- // Properties
+ public override bool IsShutdownInfo
+ {
+ get
+ {
+ return true;
+ }
+ }
+
+ public override Response visit(ICommandVisitor visitor)
+ {
+ return visitor.processShutdown(this);
+ }
- }
+ }
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/TransactionInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/TransactionInfo.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/TransactionInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/TransactionInfo.cs Fri Oct 24 14:10:22 2008
@@ -20,58 +20,88 @@
// activemq-core module
//
-using System;
-using System.Collections;
-using Apache.NMS.ActiveMQ.OpenWire;
-using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.State;
namespace Apache.NMS.ActiveMQ.Commands
{
- /// <summary>
- /// The ActiveMQ TransactionInfo Command
- /// </summary>
- public class TransactionInfo : BaseCommand
- {
- public const byte ID_TransactionInfo = 7;
-
- ConnectionId connectionId;
- TransactionId transactionId;
- byte type;
-
- public override string ToString() {
- return GetType().Name + "["
- + " ConnectionId=" + ConnectionId
- + " TransactionId=" + TransactionId
- + " Type=" + Type
- + " ]";
+ /// <summary>
+ /// The ActiveMQ TransactionInfo Command
+ /// </summary>
+ public class TransactionInfo : BaseCommand
+ {
+ public const byte ID_TransactionInfo = 7;
+
+ ConnectionId connectionId;
+ TransactionId transactionId;
+ byte type;
+
+ public const byte BEGIN = 0;
+ public const byte PREPARE = 1;
+ public const byte COMMIT_ONE_PHASE = 2;
+ public const byte COMMIT_TWO_PHASE = 3;
+ public const byte ROLLBACK = 4;
+ public const byte RECOVER = 5;
+ public const byte FORGET = 6;
+ public const byte END = 7;
+
+ public override string ToString()
+ {
+ return GetType().Name + "["
+ + " ConnectionId=" + ConnectionId
+ + " TransactionId=" + TransactionId
+ + " Type=" + Type
+ + " ]";
+ }
+
+ public override byte GetDataStructureType()
+ {
+ return ID_TransactionInfo;
+ }
+
+ // Properties
+ public ConnectionId ConnectionId
+ {
+ get { return connectionId; }
+ set { this.connectionId = value; }
}
- public override byte GetDataStructureType() {
- return ID_TransactionInfo;
- }
-
-
- // Properties
-
- public ConnectionId ConnectionId
- {
- get { return connectionId; }
- set { this.connectionId = value; }
- }
-
- public TransactionId TransactionId
- {
- get { return transactionId; }
- set { this.transactionId = value; }
- }
-
- public byte Type
- {
- get { return type; }
- set { this.type = value; }
- }
+ public TransactionId TransactionId
+ {
+ get { return transactionId; }
+ set { this.transactionId = value; }
+ }
+
+ public byte Type
+ {
+ get { return type; }
+ set { this.type = value; }
+ }
- }
+ public override Response visit(ICommandVisitor visitor)
+ {
+ switch(type)
+ {
+ case TransactionInfo.BEGIN:
+ return visitor.processBeginTransaction(this);
+ case TransactionInfo.END:
+ return visitor.processEndTransaction(this);
+ case TransactionInfo.PREPARE:
+ return visitor.processPrepareTransaction(this);
+ case TransactionInfo.COMMIT_ONE_PHASE:
+ return visitor.processCommitTransactionOnePhase(this);
+ case TransactionInfo.COMMIT_TWO_PHASE:
+ return visitor.processCommitTransactionTwoPhase(this);
+ case TransactionInfo.ROLLBACK:
+ return visitor.processRollbackTransaction(this);
+ case TransactionInfo.RECOVER:
+ return visitor.processRecoverTransactions(this);
+ case TransactionInfo.FORGET:
+ return visitor.processForgetTransaction(this);
+ default:
+ throw new IOException("Transaction info type unknown: " + type);
+ }
+ }
+ }
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/WireFormatInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/WireFormatInfo.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/WireFormatInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/WireFormatInfo.cs Fri Oct 24 14:10:22 2008
@@ -15,9 +15,8 @@
* limitations under the License.
*/
-using System;
using Apache.NMS.ActiveMQ.OpenWire;
-using Apache.NMS;
+using Apache.NMS.ActiveMQ.State;
namespace Apache.NMS.ActiveMQ.Commands
{
@@ -29,14 +28,14 @@
{
public const byte ID_WireFormatInfo = 1;
static private byte[] MAGIC = new byte[] {
- 'A'&0xFF,
- 'c'&0xFF,
- 't'&0xFF,
- 'i'&0xFF,
- 'v'&0xFF,
- 'e'&0xFF,
- 'M'&0xFF,
- 'Q'&0xFF };
+ 'A'&0xFF,
+ 'c'&0xFF,
+ 't'&0xFF,
+ 'i'&0xFF,
+ 'v'&0xFF,
+ 'e'&0xFF,
+ 'M'&0xFF,
+ 'Q'&0xFF };
byte[] magic = MAGIC;
int version;
@@ -44,16 +43,18 @@
private PrimitiveMap properties;
- public override string ToString() {
+ public override string ToString()
+ {
return GetType().Name + "["
- + " Magic=" + Magic
- + " Version=" + Version
- + " MarshalledProperties=" + Properties.ToString()
- + " ]";
+ + " Magic=" + Magic
+ + " Version=" + Version
+ + " MarshalledProperties=" + Properties.ToString()
+ + " ]";
}
- public override byte GetDataStructureType() {
+ public override byte GetDataStructureType()
+ {
return ID_WireFormatInfo;
}
@@ -79,7 +80,7 @@
return false;
}
- for(int i = 0; i < magic.Length; i++ )
+ for(int i = 0; i < magic.Length; i++)
{
if(magic[i] != MAGIC[i])
{
@@ -147,8 +148,8 @@
{
object prop = Properties["MaxInactivityDuration"];
return (null != prop
- ? (long) prop
- : 0);
+ ? (long) prop
+ : 0);
}
set { Properties["MaxInactivityDuration"] = value; }
}
@@ -158,8 +159,8 @@
{
object prop = Properties["MaxInactivityDurationInitialDelay"];
return (null != prop
- ? (long) prop
- : 0);
+ ? (long) prop
+ : 0);
}
set { Properties["MaxInactivityDurationInitialDelay"] = value; }
}
@@ -169,8 +170,8 @@
{
object prop = Properties["CacheSize"];
return (null != prop
- ? (int) prop
- : 0);
+ ? (int) prop
+ : 0);
}
set { Properties.SetInt("CacheSize", value); }
}
@@ -190,5 +191,10 @@
MarshalledProperties = properties.Marshal();
}
}
+
+ public override Response visit(ICommandVisitor visitor)
+ {
+ return visitor.processWireFormat(this);
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Fri Oct 24 14:10:22 2008
@@ -14,13 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.ActiveMQ.Transport;
-using Apache.NMS;
-using Apache.NMS.Util;
+
using System;
using System.Collections;
using System.Threading;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.Util;
namespace Apache.NMS.ActiveMQ
{
@@ -170,8 +170,8 @@
Session session = new Session(this, info, sessionAcknowledgementMode);
// Set properties on session using parameters prefixed with "session."
- System.Collections.Specialized.StringDictionary map = URISupport.ParseQuery(this.brokerUri.Query);
- URISupport.SetProperties(session, map, "session.");
+ URISupport.CompositeData c = URISupport.parseComposite(this.brokerUri);
+ URISupport.SetProperties(session, c.Parameters, "session.");
if(IsStarted)
{
@@ -384,7 +384,7 @@
/// </summary>
public LocalTransactionId CreateLocalTransactionId()
{
- LocalTransactionId id= new LocalTransactionId();
+ LocalTransactionId id = new LocalTransactionId();
id.ConnectionId = ConnectionId;
id.Value = Interlocked.Increment(ref localTransactionCounter);
return id;
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs Fri Oct 24 14:10:22 2008
@@ -14,45 +14,45 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+using System;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Transport;
-using Apache.NMS.ActiveMQ.Transport.Tcp;
-using Apache.NMS;
using Apache.NMS.Util;
-using System;
namespace Apache.NMS.ActiveMQ
{
- /// <summary>
- /// Represents a connection with a message broker
- /// </summary>
- public class ConnectionFactory : IConnectionFactory
- {
+ /// <summary>
+ /// Represents a connection with a message broker
+ /// </summary>
+ public class ConnectionFactory : IConnectionFactory
+ {
public const string DEFAULT_BROKER_URL = "activemq:tcp://localhost:61616";
public const string ENV_BROKER_URL = "ACTIVEMQ_BROKER_URL";
-
- private Uri brokerUri;
- private string connectionUserName;
- private string connectionPassword;
- private string clientId;
-
+
+ private Uri brokerUri;
+ private string connectionUserName;
+ private string connectionPassword;
+ private string clientId;
+
public static string GetDefaultBrokerUrl()
{
#if (PocketPC||NETCF||NETCF_2_0)
- return DEFAULT_BROKER_URL;
+ return DEFAULT_BROKER_URL;
#else
- string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL);
- if (answer == null) {
+ string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL);
+ if(answer == null)
+ {
answer = DEFAULT_BROKER_URL;
}
return answer;
#endif
}
-
- public ConnectionFactory()
+
+ public ConnectionFactory()
: this(GetDefaultBrokerUrl())
- {
- }
+ {
+ }
public ConnectionFactory(string brokerUri)
: this(brokerUri, null)
@@ -76,83 +76,85 @@
}
public IConnection CreateConnection()
- {
- return CreateConnection(connectionUserName, connectionPassword);
- }
-
- public IConnection CreateConnection(string userName, string password)
- {
- Uri uri = brokerUri;
- // Do we need to strip off the activemq prefix??
- if("activemq".Equals(brokerUri.Scheme))
- {
- uri = new Uri(brokerUri.AbsolutePath + brokerUri.Query);
- }
+ {
+ return CreateConnection(connectionUserName, connectionPassword);
+ }
+
+ public IConnection CreateConnection(string userName, string password)
+ {
+ Uri uri = brokerUri;
+ // Do we need to strip off the activemq prefix??
+ if("activemq".Equals(brokerUri.Scheme))
+ {
+ uri = new Uri(brokerUri.AbsolutePath + brokerUri.Query);
+ }
ConnectionInfo info = CreateConnectionInfo(userName, password);
- ITransportFactory tcpTransportFactory = new TcpTransportFactory();
- ITransport transport = tcpTransportFactory.CreateTransport(uri);
+ ITransport transport = TransportFactory.CreateTransport(uri);
Connection connection = new Connection(uri, transport, info);
// Set properties on connection using parameters prefixed with "connection."
- System.Collections.Specialized.StringDictionary map = URISupport.ParseQuery(brokerUri.Query);
- URISupport.SetProperties(connection, map, "connection.");
+ // Since this could be a composite Uri, assume the connection-specific parameters
+ // are associated with the outer-most specification of the composite Uri. What's nice
+ // is that this works with simple Uri as well.
+ URISupport.CompositeData c = URISupport.parseComposite(brokerUri);
+ URISupport.SetProperties(connection, c.Parameters, "connection.");
return connection;
- }
-
- // Properties
-
- public Uri BrokerUri
- {
- get { return brokerUri; }
- set { brokerUri = value; }
- }
-
- public string UserName
- {
- get { return connectionUserName; }
- set { connectionUserName = value; }
- }
-
- public string Password
- {
- get { return connectionPassword; }
- set { connectionPassword = value; }
- }
+ }
+
+ // Properties
+
+ public Uri BrokerUri
+ {
+ get { return brokerUri; }
+ set { brokerUri = value; }
+ }
+
+ public string UserName
+ {
+ get { return connectionUserName; }
+ set { connectionUserName = value; }
+ }
+
+ public string Password
+ {
+ get { return connectionPassword; }
+ set { connectionPassword = value; }
+ }
public string ClientId
- {
- get { return clientId; }
- set { clientId = value; }
- }
-
- // Implementation methods
-
- protected virtual ConnectionInfo CreateConnectionInfo(string userName, string password)
- {
- ConnectionInfo answer = new ConnectionInfo();
- ConnectionId connectionId = new ConnectionId();
- connectionId.Value = CreateNewGuid();
-
- answer.ConnectionId = connectionId;
- answer.UserName = userName;
- answer.Password = password;
- if(clientId == null)
- {
- answer.ClientId = CreateNewGuid();
- }
+ {
+ get { return clientId; }
+ set { clientId = value; }
+ }
+
+ // Implementation methods
+
+ protected virtual ConnectionInfo CreateConnectionInfo(string userName, string password)
+ {
+ ConnectionInfo answer = new ConnectionInfo();
+ ConnectionId connectionId = new ConnectionId();
+ connectionId.Value = CreateNewGuid();
+
+ answer.ConnectionId = connectionId;
+ answer.UserName = userName;
+ answer.Password = password;
+ if(clientId == null)
+ {
+ answer.ClientId = CreateNewGuid();
+ }
else
{
answer.ClientId = clientId;
}
- return answer;
- }
-
- protected static string CreateNewGuid()
- {
- return Guid.NewGuid().ToString();
- }
-
- }
+ return answer;
+ }
+
+ protected static string CreateNewGuid()
+ {
+ return Guid.NewGuid().ToString();
+ }
+
+ }
}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/IOException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/IOException.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/IOException.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/IOException.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Commands;
+using System.Text;
+using Apache.NMS;
+
+namespace Apache.NMS.ActiveMQ
+{
+
+ /// <summary>
+ /// Exception thrown when an IO error occurs
+ /// </summary>
+ public class IOException : NMSException
+ {
+ public IOException()
+ : base("IO Exception failed with missing exception log")
+ {
+ }
+
+ public IOException(String msg)
+ : base(msg)
+ {
+ }
+
+ public IOException(String msg, Exception inner)
+ : base(msg, inner)
+ {
+ }
+ }
+}
+
+
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Fri Oct 24 14:10:22 2008
@@ -445,7 +445,7 @@
{
DestinationInfo command = new DestinationInfo();
command.ConnectionId = Connection.ConnectionId;
- command.OperationType = 0; // 0 is add
+ command.OperationType = DestinationInfo.ADD_OPERATION_TYPE ; // 0 is add
command.Destination = tempDestination;
this.DoSend(command);
@@ -455,7 +455,7 @@
{
DestinationInfo command = new DestinationInfo();
command.ConnectionId = Connection.ConnectionId;
- command.OperationType = 1; // 1 is remove
+ command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE ; // 1 is remove
command.Destination = tempDestination;
this.DoSend(command);
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+ public class CommandVisitorAdapter : ICommandVisitor
+ {
+
+ public virtual Response processAddConnection(ConnectionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processAddConsumer(ConsumerInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processAddDestination(DestinationInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processAddProducer(ProducerInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processAddSession(SessionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processBeginTransaction(TransactionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processBrokerInfo(BrokerInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processCommitTransactionOnePhase(TransactionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processCommitTransactionTwoPhase(TransactionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processEndTransaction(TransactionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processFlush(FlushCommand command)
+ {
+ return null;
+ }
+
+ public virtual Response processForgetTransaction(TransactionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processKeepAlive(KeepAliveInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processMessage(Message send)
+ {
+ return null;
+ }
+
+ public virtual Response processMessageAck(MessageAck ack)
+ {
+ return null;
+ }
+
+ public virtual Response processMessageDispatchNotification(MessageDispatchNotification notification)
+ {
+ return null;
+ }
+
+ public virtual Response processMessagePull(MessagePull pull)
+ {
+ return null;
+ }
+
+ public virtual Response processPrepareTransaction(TransactionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processProducerAck(ProducerAck ack)
+ {
+ return null;
+ }
+
+ public virtual Response processRecoverTransactions(TransactionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processRemoveConnection(ConnectionId id)
+ {
+ return null;
+ }
+
+ public virtual Response processRemoveConsumer(ConsumerId id)
+ {
+ return null;
+ }
+
+ public virtual Response processRemoveDestination(DestinationInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processRemoveProducer(ProducerId id)
+ {
+ return null;
+ }
+
+ public virtual Response processRemoveSession(SessionId id)
+ {
+ return null;
+ }
+
+ public virtual Response processRemoveSubscription(RemoveSubscriptionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processRollbackTransaction(TransactionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processShutdown(ShutdownInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processWireFormat(WireFormatInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processMessageDispatch(MessageDispatch dispatch)
+ {
+ return null;
+ }
+
+ public virtual Response processControlCommand(ControlCommand command)
+ {
+ return null;
+ }
+
+ public virtual Response processConnectionControl(ConnectionControl control)
+ {
+ return null;
+ }
+
+ public virtual Response processConnectionError(ConnectionError error)
+ {
+ return null;
+ }
+
+ public virtual Response processConsumerControl(ConsumerControl control)
+ {
+ return null;
+ }
+
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+ public class ConnectionState
+ {
+
+ ConnectionInfo info;
+ private SynchronizedDictionary<TransactionId, TransactionState> transactions = new SynchronizedDictionary<TransactionId, TransactionState>();
+ private SynchronizedDictionary<SessionId, SessionState> sessions = new SynchronizedDictionary<SessionId, SessionState>();
+ private SynchronizedCollection<DestinationInfo> tempDestinations = new SynchronizedCollection<DestinationInfo>();
+ private AtomicBoolean _shutdown = new AtomicBoolean(false);
+
+ public ConnectionState(ConnectionInfo info)
+ {
+ this.info = info;
+ // Add the default session id.
+ addSession(new SessionInfo(info, -1));
+ }
+
+ public override String ToString()
+ {
+ return info.ToString();
+ }
+
+ public void reset(ConnectionInfo info)
+ {
+ this.info = info;
+ transactions.Clear();
+ sessions.Clear();
+ tempDestinations.Clear();
+ _shutdown.Value = false;
+ }
+
+ public void addTempDestination(DestinationInfo info)
+ {
+ checkShutdown();
+ tempDestinations.Add(info);
+ }
+
+ public void removeTempDestination(ActiveMQDestination destination)
+ {
+ for(int i = tempDestinations.Count - 1; i >= 0; i--)
+ {
+ DestinationInfo di = tempDestinations[i];
+ if(di.Destination.Equals(destination))
+ {
+ tempDestinations.RemoveAt(i);
+ }
+ }
+ }
+
+ public void addTransactionState(TransactionId id)
+ {
+ checkShutdown();
+ transactions.Add(id, new TransactionState(id));
+ }
+
+ /*
+ public TransactionState getTransactionState(TransactionId id) {
+ return transactions[id];
+ }
+
+ public SynchronizedCollection<TransactionState> getTransactionStates() {
+ return transactions.Values;
+ }
+
+ public SessionState getSessionState(SessionId id) {
+ return sessions[id];
+ }
+
+ */
+
+ public TransactionState this[TransactionId id]
+ {
+ get
+ {
+ return transactions[id];
+ }
+ }
+
+ public SynchronizedCollection<TransactionState> TransactionStates
+ {
+ get
+ {
+ return transactions.Values;
+ }
+ }
+
+ public SessionState this[SessionId id]
+ {
+ get
+ {
+ return sessions[id];
+ }
+ }
+
+ public TransactionState removeTransactionState(TransactionId id)
+ {
+ TransactionState ret = transactions[id];
+ transactions.Remove(id);
+ return ret;
+ }
+
+ public void addSession(SessionInfo info)
+ {
+ checkShutdown();
+ sessions.Add(info.SessionId, new SessionState(info));
+ }
+
+ public SessionState removeSession(SessionId id)
+ {
+ SessionState ret = sessions[id];
+ sessions.Remove(id);
+ return ret;
+ }
+
+ public ConnectionInfo Info
+ {
+ get
+ {
+ return info;
+ }
+ }
+
+ public SynchronizedCollection<SessionId> SessionIds
+ {
+ get
+ {
+ return sessions.Keys;
+ }
+ }
+
+ public SynchronizedCollection<DestinationInfo> TempDestinations
+ {
+ get
+ {
+ return tempDestinations;
+ }
+ }
+
+ public SynchronizedCollection<SessionState> SessionStates
+ {
+ get
+ {
+ return sessions.Values;
+ }
+ }
+
+ private void checkShutdown()
+ {
+ if(_shutdown.Value)
+ {
+ throw new ApplicationException("Disposed");
+ }
+ }
+
+ public void shutdown()
+ {
+ if(_shutdown.CompareAndSet(false, true))
+ {
+ foreach(SessionState ss in sessions.Values)
+ {
+ ss.shutdown();
+ }
+ }
+ }
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,656 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.Transport;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+ /// <summary>
+ /// Tracks the state of a connection so a newly established transport can be
+ /// re-initialized to the state that was tracked.
+ /// </summary>
+ public class ConnectionStateTracker : CommandVisitorAdapter
+ {
+
+ private static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
+
+ protected Dictionary<ConnectionId, ConnectionState> connectionStates = new Dictionary<ConnectionId, ConnectionState>();
+
+ private bool _trackTransactions;
+ private bool _restoreSessions = true;
+ private bool _restoreConsumers = true;
+ private bool _restoreProducers = true;
+ private bool _restoreTransaction = true;
+ private bool _trackMessages = true;
+ private int _maxCacheSize = 256;
+ private int currentCacheSize;
+ private Dictionary<MessageId, Message> messageCache = new Dictionary<MessageId, Message>();
+ private Queue<MessageId> messageCacheFIFO = new Queue<MessageId>();
+
+ protected void RemoveEldestInCache()
+ {
+ System.Collections.ICollection ic = messageCacheFIFO;
+ lock(ic.SyncRoot)
+ {
+ while(messageCacheFIFO.Count > MaxCacheSize)
+ {
+ messageCache.Remove(messageCacheFIFO.Dequeue());
+ currentCacheSize = currentCacheSize - 1;
+ }
+ }
+ }
+
+ private class RemoveTransactionAction : ThreadSimulator
+ {
+ private TransactionInfo info;
+ private ConnectionStateTracker cst;
+
+ public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst)
+ {
+ this.info = info;
+ this.cst = aCst;
+ }
+
+ public override void Run()
+ {
+ ConnectionId connectionId = info.ConnectionId;
+ ConnectionState cs = cst.connectionStates[connectionId];
+ cs.removeTransactionState(info.TransactionId);
+ }
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <param name="command"></param>
+ /// <returns>null if the command is not state tracked.</returns>
+ public Tracked track(Command command)
+ {
+ try
+ {
+ return (Tracked) command.visit(this);
+ }
+ catch(IOException e)
+ {
+ throw e;
+ }
+ catch(Exception e)
+ {
+ throw new IOException(e.Message);
+ }
+ }
+
+ public void trackBack(Command command)
+ {
+ if(TrackMessages && command != null && command.IsMessage)
+ {
+ Message message = (Message) command;
+ if(message.TransactionId == null)
+ {
+ currentCacheSize = currentCacheSize + 1;
+ }
+ }
+ }
+
+ public void DoRestore(ITransport transport)
+ {
+ // Restore the connections.
+ foreach(ConnectionState connectionState in connectionStates.Values)
+ {
+ transport.Oneway(connectionState.Info);
+ DoRestoreTempDestinations(transport, connectionState);
+
+ if(RestoreSessions)
+ {
+ DoRestoreSessions(transport, connectionState);
+ }
+
+ if(RestoreTransaction)
+ {
+ DoRestoreTransactions(transport, connectionState);
+ }
+ }
+ //now flush messages
+ foreach(Message msg in messageCache.Values)
+ {
+ transport.Oneway(msg);
+ }
+ }
+
+ private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState)
+ {
+ SynchronizedCollection<TransactionState> transactionStates = connectionState.TransactionStates;
+ foreach(TransactionState transactionState in transactionStates)
+ {
+ foreach(Command command in transactionState.Commands)
+ {
+ transport.Oneway(command);
+ }
+ }
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <param name="transport"></param>
+ /// <param name="connectionState"></param>
+ protected void DoRestoreSessions(ITransport transport, ConnectionState connectionState)
+ {
+ // Restore the connection's sessions
+ foreach(SessionState sessionState in connectionState.SessionStates)
+ {
+ transport.Oneway(sessionState.Info);
+
+ if(RestoreProducers)
+ {
+ DoRestoreProducers(transport, sessionState);
+ }
+
+ if(RestoreConsumers)
+ {
+ DoRestoreConsumers(transport, sessionState);
+ }
+ }
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <param name="transport"></param>
+ /// <param name="sessionState"></param>
+ protected void DoRestoreConsumers(ITransport transport, SessionState sessionState)
+ {
+ // Restore the session's consumers
+ foreach(ConsumerState consumerState in sessionState.ConsumerStates)
+ {
+ transport.Oneway(consumerState.Info);
+ }
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <param name="transport"></param>
+ /// <param name="sessionState"></param>
+ protected void DoRestoreProducers(ITransport transport, SessionState sessionState)
+ {
+ // Restore the session's producers
+
+ foreach(ProducerState producerState in sessionState.ProducerStates)
+ {
+ transport.Oneway(producerState.Info);
+ }
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <param name="transport"></param>
+ /// <param name="connectionState"></param>
+ protected void DoRestoreTempDestinations(ITransport transport, ConnectionState connectionState)
+ {
+ // Restore the connection's temp destinations.
+ foreach(DestinationInfo destinationInfo in connectionState.TempDestinations)
+ {
+ transport.Oneway(destinationInfo);
+ }
+ }
+
+ public override Response processAddDestination(DestinationInfo info)
+ {
+ if(info != null)
+ {
+ ConnectionState cs = connectionStates[info.ConnectionId];
+ if(cs != null && info.Destination.IsTemporary)
+ {
+ cs.addTempDestination(info);
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processRemoveDestination(DestinationInfo info)
+ {
+ if(info != null)
+ {
+ ConnectionState cs = connectionStates[info.ConnectionId];
+ if(cs != null && info.Destination.IsTemporary)
+ {
+ cs.removeTempDestination(info.Destination);
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processAddProducer(ProducerInfo info)
+ {
+ if(info != null && info.ProducerId != null)
+ {
+ SessionId sessionId = info.ProducerId.ParentId;
+ if(sessionId != null)
+ {
+ ConnectionId connectionId = sessionId.ParentId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ SessionState ss = cs[sessionId];
+ if(ss != null)
+ {
+ ss.addProducer(info);
+ }
+ }
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processRemoveProducer(ProducerId id)
+ {
+ if(id != null)
+ {
+ SessionId sessionId = id.ParentId;
+ if(sessionId != null)
+ {
+ ConnectionId connectionId = sessionId.ParentId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ SessionState ss = cs[sessionId];
+ if(ss != null)
+ {
+ ss.removeProducer(id);
+ }
+ }
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processAddConsumer(ConsumerInfo info)
+ {
+ if(info != null)
+ {
+ SessionId sessionId = info.ConsumerId.ParentId;
+ if(sessionId != null)
+ {
+ ConnectionId connectionId = sessionId.ParentId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ SessionState ss = cs[sessionId];
+ if(ss != null)
+ {
+ ss.addConsumer(info);
+ }
+ }
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processRemoveConsumer(ConsumerId id)
+ {
+ if(id != null)
+ {
+ SessionId sessionId = id.ParentId;
+ if(sessionId != null)
+ {
+ ConnectionId connectionId = sessionId.ParentId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ SessionState ss = cs[sessionId];
+ if(ss != null)
+ {
+ ss.removeConsumer(id);
+ }
+ }
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processAddSession(SessionInfo info)
+ {
+ if(info != null)
+ {
+ ConnectionId connectionId = info.SessionId.ParentId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ cs.addSession(info);
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processRemoveSession(SessionId id)
+ {
+ if(id != null)
+ {
+ ConnectionId connectionId = id.ParentId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ cs.removeSession(id);
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processAddConnection(ConnectionInfo info)
+ {
+ if(info != null)
+ {
+ connectionStates.Add(info.ConnectionId, new ConnectionState(info));
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processRemoveConnection(ConnectionId id)
+ {
+ if(id != null)
+ {
+ connectionStates.Remove(id);
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processMessage(Message send)
+ {
+ if(send != null)
+ {
+ if(TrackTransactions && send.TransactionId != null)
+ {
+ ConnectionId connectionId = send.ProducerId.ParentId.ParentId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ TransactionState transactionState = cs[send.TransactionId];
+ if(transactionState != null)
+ {
+ transactionState.addCommand(send);
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+ else if(TrackMessages)
+ {
+ messageCache.Add(send.MessageId, (Message) send.Clone());
+ RemoveEldestInCache();
+ }
+ }
+ return null;
+ }
+
+ public override Response processMessageAck(MessageAck ack)
+ {
+ if(TrackTransactions && ack != null && ack.TransactionId != null)
+ {
+ ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ TransactionState transactionState = cs[ack.TransactionId];
+ if(transactionState != null)
+ {
+ transactionState.addCommand(ack);
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+ return null;
+ }
+
+ public override Response processBeginTransaction(TransactionInfo info)
+ {
+ if(TrackTransactions && info != null && info.TransactionId != null)
+ {
+ ConnectionId connectionId = info.ConnectionId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ cs.addTransactionState(info.TransactionId);
+ TransactionState state = cs[info.TransactionId];
+ state.addCommand(info);
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+ return null;
+ }
+
+ public override Response processPrepareTransaction(TransactionInfo info)
+ {
+ if(TrackTransactions && info != null)
+ {
+ ConnectionId connectionId = info.ConnectionId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ TransactionState transactionState = cs[info.TransactionId];
+ if(transactionState != null)
+ {
+ transactionState.addCommand(info);
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+ return null;
+ }
+
+ public override Response processCommitTransactionOnePhase(TransactionInfo info)
+ {
+ if(TrackTransactions && info != null)
+ {
+ ConnectionId connectionId = info.ConnectionId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ TransactionState transactionState = cs[info.TransactionId];
+ if(transactionState != null)
+ {
+ transactionState.addCommand(info);
+ return new Tracked(new RemoveTransactionAction(info, this));
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public override Response processCommitTransactionTwoPhase(TransactionInfo info)
+ {
+ if(TrackTransactions && info != null)
+ {
+ ConnectionId connectionId = info.ConnectionId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ TransactionState transactionState = cs[info.TransactionId];
+ if(transactionState != null)
+ {
+ transactionState.addCommand(info);
+ return new Tracked(new RemoveTransactionAction(info, this));
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public override Response processRollbackTransaction(TransactionInfo info)
+ {
+ if(TrackTransactions && info != null)
+ {
+ ConnectionId connectionId = info.ConnectionId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ TransactionState transactionState = cs[info.TransactionId];
+ if(transactionState != null)
+ {
+ transactionState.addCommand(info);
+ return new Tracked(new RemoveTransactionAction(info, this));
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public override Response processEndTransaction(TransactionInfo info)
+ {
+ if(TrackTransactions && info != null)
+ {
+ ConnectionId connectionId = info.ConnectionId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ TransactionState transactionState = cs[info.TransactionId];
+ if(transactionState != null)
+ {
+ transactionState.addCommand(info);
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+ return null;
+ }
+
+ public bool RestoreConsumers
+ {
+ get
+ {
+ return _restoreConsumers;
+ }
+ set
+ {
+ _restoreConsumers = value;
+ }
+ }
+
+ public bool RestoreProducers
+ {
+ get
+ {
+ return _restoreProducers;
+ }
+ set
+ {
+ _restoreProducers = value;
+ }
+ }
+
+ public bool RestoreSessions
+ {
+ get
+ {
+ return _restoreSessions;
+ }
+ set
+ {
+ _restoreSessions = value;
+ }
+ }
+
+ public bool TrackTransactions
+ {
+ get
+ {
+ return _trackTransactions;
+ }
+ set
+ {
+ _trackTransactions = value;
+ }
+ }
+
+ public bool RestoreTransaction
+ {
+ get
+ {
+ return _restoreTransaction;
+ }
+ set
+ {
+ _restoreTransaction = value;
+ }
+ }
+
+ public bool TrackMessages
+ {
+ get
+ {
+ return _trackMessages;
+ }
+ set
+ {
+ _trackMessages = value;
+ }
+ }
+
+ public int MaxCacheSize
+ {
+ get
+ {
+ return _maxCacheSize;
+ }
+ set
+ {
+ _maxCacheSize = value;
+ }
+ }
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+
+ public class ConsumerState
+ {
+ ConsumerInfo info;
+
+ public ConsumerState(ConsumerInfo info)
+ {
+ this.info = info;
+ }
+
+ public override String ToString()
+ {
+ return info.ToString();
+ }
+
+ public ConsumerInfo Info
+ {
+ get
+ {
+ return info;
+ }
+ }
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+ public interface ICommandVisitor
+ {
+
+ Response processAddConnection(ConnectionInfo info);
+
+ Response processAddSession(SessionInfo info);
+
+ Response processAddProducer(ProducerInfo info);
+
+ Response processAddConsumer(ConsumerInfo info);
+
+ Response processRemoveConnection(ConnectionId id);
+
+ Response processRemoveSession(SessionId id);
+
+ Response processRemoveProducer(ProducerId id);
+
+ Response processRemoveConsumer(ConsumerId id);
+
+ Response processAddDestination(DestinationInfo info);
+
+ Response processRemoveDestination(DestinationInfo info);
+
+ Response processRemoveSubscription(RemoveSubscriptionInfo info);
+
+ Response processMessage(Message send);
+
+ Response processMessageAck(MessageAck ack);
+
+ Response processMessagePull(MessagePull pull);
+
+ Response processBeginTransaction(TransactionInfo info);
+
+ Response processPrepareTransaction(TransactionInfo info);
+
+ Response processCommitTransactionOnePhase(TransactionInfo info);
+
+ Response processCommitTransactionTwoPhase(TransactionInfo info);
+
+ Response processRollbackTransaction(TransactionInfo info);
+
+ Response processWireFormat(WireFormatInfo info);
+
+ Response processKeepAlive(KeepAliveInfo info);
+
+ Response processShutdown(ShutdownInfo info);
+
+ Response processFlush(FlushCommand command);
+
+ Response processBrokerInfo(BrokerInfo info);
+
+ Response processRecoverTransactions(TransactionInfo info);
+
+ Response processForgetTransaction(TransactionInfo info);
+
+ Response processEndTransaction(TransactionInfo info);
+
+ Response processMessageDispatchNotification(MessageDispatchNotification notification);
+
+ Response processProducerAck(ProducerAck ack);
+
+ Response processMessageDispatch(MessageDispatch dispatch);
+
+ Response processControlCommand(ControlCommand command);
+
+ Response processConnectionError(ConnectionError error);
+
+ Response processConnectionControl(ConnectionControl control);
+
+ Response processConsumerControl(ConsumerControl control);
+
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+ public class ProducerState
+ {
+ ProducerInfo info;
+
+ public ProducerState(ProducerInfo info)
+ {
+ this.info = info;
+ }
+
+ public override String ToString()
+ {
+ return info.ToString();
+ }
+
+ public ProducerInfo Info
+ {
+ get
+ {
+ return info;
+ }
+ }
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+ public class SessionState
+ {
+ SessionInfo info;
+
+ private SynchronizedDictionary<ProducerId, ProducerState> producers = new SynchronizedDictionary<ProducerId, ProducerState>();
+ private SynchronizedDictionary<ConsumerId, ConsumerState> consumers = new SynchronizedDictionary<ConsumerId, ConsumerState>();
+ private AtomicBoolean _shutdown = new AtomicBoolean(false);
+
+ public SessionState(SessionInfo info)
+ {
+ this.info = info;
+ }
+
+ public override String ToString()
+ {
+ return info.ToString();
+ }
+
+ public void addProducer(ProducerInfo info)
+ {
+ checkShutdown();
+ producers.Add(info.ProducerId, new ProducerState(info));
+ }
+
+ public ProducerState removeProducer(ProducerId id)
+ {
+ ProducerState ret = producers[id];
+ producers.Remove(id);
+ return ret;
+ }
+
+ public void addConsumer(ConsumerInfo info)
+ {
+ checkShutdown();
+ consumers.Add(info.ConsumerId, new ConsumerState(info));
+ }
+
+ public ConsumerState removeConsumer(ConsumerId id)
+ {
+ ConsumerState ret = consumers[id];
+ consumers.Remove(id);
+ return ret;
+ }
+
+ public SessionInfo Info
+ {
+ get
+ {
+ return info;
+ }
+ }
+
+ public SynchronizedCollection<ConsumerId> ConsumerIds
+ {
+ get
+ {
+ return consumers.Keys;
+ }
+ }
+
+ public SynchronizedCollection<ProducerId> ProducerIds
+ {
+ get
+ {
+ return producers.Keys;
+ }
+ }
+
+ public SynchronizedCollection<ProducerState> ProducerStates
+ {
+ get
+ {
+ return producers.Values;
+ }
+ }
+
+ public ProducerState getProducerState(ProducerId producerId)
+ {
+ return producers[producerId];
+ }
+
+ public ProducerState this[ProducerId producerId]
+ {
+ get
+ {
+ return producers[producerId];
+ }
+ }
+
+ public SynchronizedCollection<ConsumerState> ConsumerStates
+ {
+ get
+ {
+ return consumers.Values;
+ }
+ }
+
+ public ConsumerState getConsumerState(ConsumerId consumerId)
+ {
+ return consumers[consumerId];
+ }
+
+ public ConsumerState this[ConsumerId consumerId]
+ {
+ get
+ {
+ return consumers[consumerId];
+ }
+ }
+
+ private void checkShutdown()
+ {
+ if(_shutdown.Value)
+ {
+ throw new ApplicationException("Disposed");
+ }
+ }
+
+ public void shutdown()
+ {
+ _shutdown.Value = false;
+ }
+
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Collections.Generic;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+ public class SynchronizedCollection<TValue>
+ where TValue : class
+ {
+ private Object myLock = new Object();
+ private ArrayList _collection;
+
+ public SynchronizedCollection()
+ {
+ _collection = new ArrayList();
+ }
+
+ public SynchronizedCollection(ICollection c)
+ {
+ _collection = new ArrayList(c);
+ }
+
+ public int Count
+ {
+ get
+ {
+ lock(myLock)
+ {
+ return _collection.Count;
+ }
+ }
+ }
+
+ public bool IsReadOnly
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public int Add(TValue v)
+ {
+ lock(myLock)
+ {
+ return _collection.Add(v);
+ }
+ }
+
+ public void Clear()
+ {
+ lock(myLock)
+ {
+ _collection.Clear();
+ }
+ }
+
+ public bool Contains(TValue v)
+ {
+ lock(myLock)
+ {
+ return _collection.Contains(v);
+ }
+ }
+
+ public void CopyTo(TValue[] a, int index)
+ {
+ lock(myLock)
+ {
+ _collection.CopyTo(a, index);
+ }
+ }
+
+ public void Remove(TValue v)
+ {
+ lock(myLock)
+ {
+ _collection.Remove(v);
+ }
+ }
+
+ public void RemoveAt(int index)
+ {
+ lock(myLock)
+ {
+ _collection.RemoveAt(index);
+ }
+ }
+
+ public TValue this[int index]
+ {
+ get
+ {
+ TValue ret;
+ lock(myLock)
+ {
+ ret = (TValue) _collection[index];
+ }
+ return (TValue) ret;
+ }
+ set
+ {
+ lock(myLock)
+ {
+ _collection[index] = value;
+ }
+ }
+ }
+
+ public IEnumerator GetEnumerator()
+ {
+ lock(myLock)
+ {
+ return _collection.GetEnumerator();
+ }
+ }
+
+ public IEnumerator GetEnumerator(int index, int count)
+ {
+ lock(myLock)
+ {
+ return _collection.GetEnumerator(index, count);
+ }
+ }
+
+ }
+
+ public class SynchronizedDictionary<TKey, TValue>
+ where TKey : class
+ where TValue : class
+ {
+ private Object myLock = new Object();
+ private Dictionary<TKey, TValue> _dictionary = new Dictionary<TKey, TValue>();
+
+ public void Clear()
+ {
+ _dictionary.Clear();
+ }
+
+ public TValue this[TKey key]
+ {
+ get
+ {
+ TValue ret;
+ lock(myLock)
+ {
+ ret = _dictionary[key];
+ }
+ return ret;
+ }
+ set
+ {
+ lock(myLock)
+ {
+ _dictionary[key] = value;
+ }
+ }
+ }
+
+ public SynchronizedCollection<TKey> Keys
+ {
+ get
+ {
+ lock(myLock)
+ {
+ return new SynchronizedCollection<TKey>(_dictionary.Keys);
+ }
+ }
+ }
+
+ public SynchronizedCollection<TValue> Values
+ {
+ get
+ {
+ lock(myLock)
+ {
+ return new SynchronizedCollection<TValue>(_dictionary.Values);
+ }
+ }
+ }
+
+ public void Add(TKey k, TValue v)
+ {
+ lock(myLock)
+ {
+ _dictionary.Add(k, v);
+ }
+ }
+
+ public bool Remove(TKey v)
+ {
+ lock(myLock)
+ {
+ return _dictionary.Remove(v);
+ }
+ }
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ThreadSimulator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ThreadSimulator.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ThreadSimulator.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ThreadSimulator.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+ public abstract class ThreadSimulator
+ {
+ public virtual void Run()
+ {
+ throw new ApplicationException("ThreadSimulator.Run() should be overridden.");
+ }
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+ public class Tracked : Response
+ {
+
+ private ThreadSimulator runnable = null;
+
+ public Tracked(ThreadSimulator runnable)
+ {
+ this.runnable = runnable;
+ }
+
+ public void onResponses()
+ {
+ if(runnable != null)
+ {
+ runnable.Run();
+ runnable = null;
+ }
+ }
+
+ virtual public bool WaitingForResponse
+ {
+ get
+ {
+ return runnable != null;
+ }
+ }
+
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+ public class TransactionState
+ {
+
+ private List<Command> commands = new List<Command>();
+ private TransactionId id;
+ private AtomicBoolean _shutdown = new AtomicBoolean(false);
+ private bool prepared;
+ private int preparedResult;
+
+ public TransactionState(TransactionId id)
+ {
+ this.id = id;
+ }
+
+ public override String ToString()
+ {
+ return id.ToString();
+ }
+
+ public void addCommand(Command operation)
+ {
+ checkShutdown();
+ commands.Add(operation);
+ }
+
+ public List<Command> Commands
+ {
+ get
+ {
+ return commands;
+ }
+ }
+
+ private void checkShutdown()
+ {
+ if(_shutdown.Value)
+ {
+ throw new ApplicationException("Disposed");
+ }
+ }
+
+ public void shutdown()
+ {
+ _shutdown.Value = false;
+ }
+
+ public TransactionId getId()
+ {
+ return id;
+ }
+
+ public bool Prepared
+ {
+ get
+ {
+ return prepared;
+ }
+ set
+ {
+ prepared = value;
+ }
+ }
+
+ public int PreparedResult
+ {
+ get
+ {
+ return preparedResult;
+ }
+ set
+ {
+ preparedResult = value;
+ }
+ }
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.ActiveMQ.Threads
+{
+ public class DefaultThreadPools
+ {
+ /***
+ * Java's execution model is different enough that I have left out
+ * the Executure concept in this implementation. This must be
+ * reviewed to see what is appropriate for the future.
+ * -Allan Schrum
+ private static Executor DEFAULT_POOL = null;
+ static {
+ DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new ThreadFactory()
+ {
+ public Thread newThread(Runnable runnable)
+ {
+ Thread thread = new Thread(runnable, "ActiveMQ Default Thread Pool Thread");
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ }
+
+ public static Executor DefaultPool
+ {
+ get { return DEFAULT_POOL; }
+ }
+ ***/
+
+ private static TaskRunnerFactory DEFAULT_TASK_RUNNER_FACTORY = new TaskRunnerFactory();
+
+ private DefaultThreadPools()
+ {
+ }
+
+ public static TaskRunnerFactory DefaultTaskRunnerFactory
+ {
+ get
+ {
+ return DEFAULT_TASK_RUNNER_FACTORY;
+ }
+ }
+
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.NMS.ActiveMQ.Threads
+{
+ class PooledTaskRunner : TaskRunner
+ {
+ private int maxIterationsPerRun;
+ private Task task;
+ private Object runable = new Object();
+ private bool queued;
+ private bool _shutdown;
+ private bool iterating;
+ private volatile System.Threading.Thread runningThread;
+
+ public void run(Object o)
+ {
+ PooledTaskRunner p = o as PooledTaskRunner;
+ p.runningThread = System.Threading.Thread.CurrentThread;
+ try
+ {
+ p.runTask();
+ }
+ finally
+ {
+ p.runningThread = null;
+ }
+ }
+
+ public PooledTaskRunner(Task task, int maxIterationsPerRun)
+ {
+ this.maxIterationsPerRun = maxIterationsPerRun;
+ this.task = task;
+ this._shutdown = false;
+ this.iterating = false;
+ this.queued = true;
+ ThreadPool.QueueUserWorkItem(new WaitCallback(run), this);
+ }
+
+ /**
+ * We Expect MANY wakeup calls on the same TaskRunner.
+ */
+ public void wakeup()
+ {
+ lock(runable)
+ {
+
+ // When we get in here, we make some assumptions of state:
+ // queued=false, iterating=false: wakeup() has not be called and
+ // therefore task is not executing.
+ // queued=true, iterating=false: wakeup() was called but, task
+ // execution has not started yet
+ // queued=false, iterating=true : wakeup() was called, which caused
+ // task execution to start.
+ // queued=true, iterating=true : wakeup() called after task
+ // execution was started.
+
+ if(queued || _shutdown)
+ {
+ return;
+ }
+
+ queued = true;
+
+ // The runTask() method will do this for me once we are done
+ // iterating.
+ if(!iterating)
+ {
+ ThreadPool.QueueUserWorkItem(new WaitCallback(run), this);
+ }
+ }
+ }
+
+ /**
+ * shut down the task
+ *
+ */
+ public void shutdown(int timeout)
+ {
+ lock(runable)
+ {
+ _shutdown = true;
+ // the check on the thread is done
+ // because a call to iterate can result in
+ // shutDown() being called, which would wait forever
+ // waiting for iterating to finish
+ if(runningThread != System.Threading.Thread.CurrentThread)
+ {
+ if(iterating)
+ {
+ System.Threading.Thread.Sleep(timeout);
+ }
+ }
+ }
+ }
+
+ public void shutdown()
+ {
+ shutdown(0);
+ }
+
+ void runTask()
+ {
+
+ lock(runable)
+ {
+ queued = false;
+ if(_shutdown)
+ {
+ iterating = false;
+ return;
+ }
+ iterating = true;
+ }
+
+ // Don't synchronize while we are iterating so that
+ // multiple wakeup() calls can be executed concurrently.
+ bool done = false;
+ try
+ {
+ for(int i = 0; i < maxIterationsPerRun; i++)
+ {
+ if(!task.iterate())
+ {
+ done = true;
+ break;
+ }
+ }
+ }
+ finally
+ {
+ lock(runable)
+ {
+ iterating = false;
+ if(_shutdown)
+ {
+ queued = false;
+ }
+ else
+ {
+ // If we could not iterate all the items
+ // then we need to re-queue.
+ if(!done)
+ {
+ queued = true;
+ }
+
+ if(queued)
+ {
+ ThreadPool.QueueUserWorkItem(new WaitCallback(run), this);
+ }
+ }
+ }
+ }
+ }
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.ActiveMQ.Threads
+{
+ /// <summary>
+ /// Represents a task that may take a few iterations to complete.
+ /// </summary>
+ public interface Task
+ {
+ bool iterate();
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+namespace Apache.NMS.ActiveMQ.Threads
+{
+ /// <summary>
+ /// Allows you to request a thread execute the associated Task.
+ /// </summary>
+ public interface TaskRunner
+ {
+ void wakeup();
+ void shutdown();
+ void shutdown(int timeout);
+ }
+}