You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/06/25 19:41:11 UTC
svn commit: r958048 -
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
Author: tabish
Date: Fri Jun 25 17:41:11 2010
New Revision: 958048
URL: http://svn.apache.org/viewvc?rev=958048&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-258
Lowered the level of the unknown response log statement to Debug.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs?rev=958048&r1=958047&r2=958048&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs Fri Jun 25 17:41:11 2010
@@ -22,124 +22,127 @@ using Apache.NMS.ActiveMQ.Commands;
namespace Apache.NMS.ActiveMQ.Transport
{
- /// <summary>
- /// A Transport that correlates asynchronous send/receive messages into single request/response.
- /// </summary>
- public class ResponseCorrelator : TransportFilter
- {
- private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
- private int nextCommandId;
-
- public ResponseCorrelator(ITransport next) : base(next)
- {
- }
-
- protected override void OnException(ITransport sender, Exception command)
- {
- base.OnException(sender, command);
-
- foreach(DictionaryEntry entry in requestMap)
- {
- FutureResponse value = (FutureResponse) entry.Value;
- ExceptionResponse response = new ExceptionResponse();
- BrokerError error = new BrokerError();
-
- error.Message = command.Message;
- response.Exception = error;
- value.Response = response;
- }
+ /// <summary>
+ /// A Transport that correlates asynchronous send/receive messages into single request/response.
+ /// </summary>
+ public class ResponseCorrelator : TransportFilter
+ {
+ private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
+ private int nextCommandId;
+
+ public ResponseCorrelator(ITransport next) : base(next)
+ {
+ }
+
+ protected override void OnException(ITransport sender, Exception command)
+ {
+ base.OnException(sender, command);
+
+ foreach(DictionaryEntry entry in requestMap)
+ {
+ FutureResponse value = (FutureResponse) entry.Value;
+ ExceptionResponse response = new ExceptionResponse();
+ BrokerError error = new BrokerError();
+
+ error.Message = command.Message;
+ response.Exception = error;
+ value.Response = response;
+ }
- requestMap.Clear();
- }
+ requestMap.Clear();
+ }
- internal int GetNextCommandId()
- {
+ internal int GetNextCommandId()
+ {
return Interlocked.Increment(ref nextCommandId);
- }
+ }
- public override void Oneway(Command command)
- {
- if(0 == command.CommandId)
- {
- command.CommandId = GetNextCommandId();
- }
-
- next.Oneway(command);
- }
-
- public override FutureResponse AsyncRequest(Command command)
- {
- int commandId = GetNextCommandId();
-
- command.CommandId = commandId;
- command.ResponseRequired = true;
- FutureResponse future = new FutureResponse();
- requestMap[commandId] = future;
- next.Oneway(command);
- return future;
- }
-
- public override Response Request(Command command, TimeSpan timeout)
- {
- FutureResponse future = AsyncRequest(command);
- future.ResponseTimeout = timeout;
- Response response = future.Response;
-
- if(response != null && response is ExceptionResponse)
- {
- ExceptionResponse er = (ExceptionResponse) response;
- BrokerError brokerError = er.Exception;
-
- if (brokerError == null)
- {
- throw new BrokerException();
- }
- else
- {
- throw new BrokerException(brokerError);
- }
- }
-
- return response;
- }
-
- protected override void OnCommand(ITransport sender, Command command)
- {
- if(command is Response)
- {
- Response response = (Response) command;
- int correlationId = response.CorrelationId;
- FutureResponse future = (FutureResponse) requestMap[correlationId];
-
- if(future != null)
- {
- requestMap.Remove(correlationId);
- future.Response = response;
-
- if(response is ExceptionResponse)
- {
- ExceptionResponse er = (ExceptionResponse) response;
- BrokerError brokerError = er.Exception;
- BrokerException exception = new BrokerException(brokerError);
- this.exceptionHandler(this, exception);
- }
- }
- else
- {
- Tracer.Error("Unknown response ID: " + response.CommandId + " for response: " + response);
- }
- }
- else if(command is ShutdownInfo)
- {
- // lets shutdown
- this.commandHandler(sender, command);
- }
- else
- {
- this.commandHandler(sender, command);
- }
- }
- }
+ public override void Oneway(Command command)
+ {
+ if(0 == command.CommandId)
+ {
+ command.CommandId = GetNextCommandId();
+ }
+
+ next.Oneway(command);
+ }
+
+ public override FutureResponse AsyncRequest(Command command)
+ {
+ int commandId = GetNextCommandId();
+
+ command.CommandId = commandId;
+ command.ResponseRequired = true;
+ FutureResponse future = new FutureResponse();
+ requestMap[commandId] = future;
+ next.Oneway(command);
+ return future;
+ }
+
+ public override Response Request(Command command, TimeSpan timeout)
+ {
+ FutureResponse future = AsyncRequest(command);
+ future.ResponseTimeout = timeout;
+ Response response = future.Response;
+
+ if(response != null && response is ExceptionResponse)
+ {
+ ExceptionResponse er = (ExceptionResponse) response;
+ BrokerError brokerError = er.Exception;
+
+ if (brokerError == null)
+ {
+ throw new BrokerException();
+ }
+ else
+ {
+ throw new BrokerException(brokerError);
+ }
+ }
+
+ return response;
+ }
+
+ protected override void OnCommand(ITransport sender, Command command)
+ {
+ if(command is Response)
+ {
+ Response response = (Response) command;
+ int correlationId = response.CorrelationId;
+ FutureResponse future = (FutureResponse) requestMap[correlationId];
+
+ if(future != null)
+ {
+ requestMap.Remove(correlationId);
+ future.Response = response;
+
+ if(response is ExceptionResponse)
+ {
+ ExceptionResponse er = (ExceptionResponse) response;
+ BrokerError brokerError = er.Exception;
+ BrokerException exception = new BrokerException(brokerError);
+ this.exceptionHandler(this, exception);
+ }
+ }
+ else
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Unknown response ID: " + response.CommandId + " for response: " + response);
+ }
+ }
+ }
+ else if(command is ShutdownInfo)
+ {
+ // lets shutdown
+ this.commandHandler(sender, command);
+ }
+ else
+ {
+ this.commandHandler(sender, command);
+ }
+ }
+ }
}