You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/07/08 19:46:16 UTC
svn commit: r1144402 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/
main/csharp/Commands/ main/csharp/Util/ test/csharp/
Author: tabish
Date: Fri Jul 8 17:46:16 2011
New Revision: 1144402
URL: http://svn.apache.org/viewvc?rev=1144402&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-334
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/AdvisoryConsumer.cs (with props)
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFailedException.cs (with props)
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/AdvisorySupport.cs (with props)
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/TempDestinationTest.cs (with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQDestination.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/AdvisoryConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/AdvisoryConsumer.cs?rev=1144402&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/AdvisoryConsumer.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/AdvisoryConsumer.cs Fri Jul 8 17:46:16 2011
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Util;
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ
+{
+ /// <summary>
+ /// Consumes Advisory Messages for Temp Destination creation on deletion so that
+ /// the connection can track valid destinations for its sessions, and session resources.
+ /// </summary>
+ internal class AdvisoryConsumer : IDispatcher
+ {
+ private readonly Connection connection;
+ private readonly ConsumerInfo info;
+
+ private bool closed = false;
+ private int deliveredCounter = 0;
+
+ internal AdvisoryConsumer(Connection connection, ConsumerId consumerId) : base()
+ {
+ this.connection = connection;
+ this.info = new ConsumerInfo();
+ this.info.ConsumerId = consumerId;
+ this.info.Destination = AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
+ this.info.PrefetchSize = 1000;
+ this.info.NoLocal = true;
+
+ this.connection.addDispatcher(consumerId, this);
+ this.connection.SyncRequest(this.info);
+ }
+
+ internal void Dispose()
+ {
+ if(!closed)
+ {
+ this.closed = true;
+ try
+ {
+ RemoveInfo removeIt = new RemoveInfo();
+ removeIt.ObjectId = this.info.ConsumerId;
+ this.connection.Oneway(removeIt);
+ }
+ catch(Exception e)
+ {
+ Tracer.Debug("Failoed to send remove for AdvisoryConsumer: " + e.Message);
+ }
+ this.connection.removeDispatcher(this.info.ConsumerId);
+ }
+ }
+
+ public void Dispatch(MessageDispatch messageDispatch)
+ {
+ // Auto ack messages when we reach 75% of the prefetch
+ deliveredCounter++;
+
+ if(deliveredCounter > (0.75 * this.info.PrefetchSize))
+ {
+ try
+ {
+ MessageAck ack = new MessageAck();
+ ack.AckType = (byte)AckType.ConsumedAck;
+ ack.FirstMessageId = messageDispatch.Message.MessageId;
+ ack.MessageCount = deliveredCounter;
+
+ this.connection.Oneway(ack);
+ this.deliveredCounter = 0;
+ }
+ catch(Exception e)
+ {
+ this.connection.OnAsyncException(e);
+ }
+ }
+
+ DestinationInfo destInfo = messageDispatch.Message.DataStructure as DestinationInfo;
+ if(destInfo != null)
+ {
+ ProcessDestinationInfo(destInfo);
+ }
+ else
+ {
+ // This can happen across networks
+ Tracer.Debug("Unexpected message was dispatched to the AdvisoryConsumer: " + messageDispatch);
+ }
+ }
+
+ private void ProcessDestinationInfo(DestinationInfo destInfo)
+ {
+ ActiveMQDestination dest = destInfo.Destination;
+ if(!dest.IsTemporary)
+ {
+ return;
+ }
+
+ ActiveMQTempDestination tempDest = dest as ActiveMQTempDestination;
+ if(destInfo.OperationType == DestinationInfo.ADD_OPERATION_TYPE)
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("AdvisoryConsumer adding: " + tempDest);
+ }
+ this.connection.AddTempDestination(tempDest);
+ }
+ else if(destInfo.OperationType == DestinationInfo.REMOVE_OPERATION_TYPE)
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("AdvisoryConsumer removing: " + tempDest);
+ }
+ this.connection.RemoveTempDestination(tempDest);
+ }
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/AdvisoryConsumer.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQDestination.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQDestination.cs?rev=1144402&r1=1144401&r2=1144402&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQDestination.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQDestination.cs Fri Jul 8 17:46:16 2011
@@ -17,6 +17,7 @@
using System;
using System.Collections.Specialized;
+using System.Collections.Generic;
using Apache.NMS.Util;
namespace Apache.NMS.ActiveMQ.Commands
@@ -423,6 +424,30 @@ namespace Apache.NMS.ActiveMQ.Commands
}
}
+ /// <summary>
+ /// Gets the Destination Type of this Destination as a String value which is one
+ /// of {Queue,Topic,TempQueue,TempTopic}.
+ /// </summary>
+ /// <returns>
+ /// The Destination Type as a String.
+ /// </returns>
+ public String GetDestinationTypeAsString()
+ {
+ switch(GetDestinationType())
+ {
+ case ACTIVEMQ_QUEUE:
+ return "Queue";
+ case ACTIVEMQ_TOPIC:
+ return "Topic";
+ case ACTIVEMQ_TEMPORARY_QUEUE:
+ return "TempQueue";
+ case ACTIVEMQ_TEMPORARY_TOPIC:
+ return "TempTopic";
+ default:
+ throw new NMSException("Invalid destination type: " + GetDestinationType());
+ }
+ }
+
/// <summary>
/// Returns true if this destination represents a collection of
/// destinations; allowing a set of destinations to be published to or subscribed
@@ -439,30 +464,32 @@ namespace Apache.NMS.ActiveMQ.Commands
}
}
- /*public List GetChildDestinations() {
- List answer = new ArrayList();
- StringTokenizer iter = new StringTokenizer(physicalName, COMPOSITE_SEPARATOR);
- while (iter.hasMoreTokens()) {
- String name = iter.nextToken();
- Destination child = null;
- if (name.StartsWith(QUEUE_PREFIX)) {
- child = new ActiveMQQueue(name.Substring(QUEUE_PREFIX.Length));
- }
- else if (name.StartsWith(TOPIC_PREFIX)) {
- child = new ActiveMQTopic(name.Substring(TOPIC_PREFIX.Length));
- }
- else {
- child = createDestination(name);
- }
- answer.add(child);
- }
- if (answer.size() == 1) {
- // lets put ourselves inside the collection
- // as we are not really a composite destination
- answer.set(0, this);
- }
- return answer;
- }*/
+ public ActiveMQDestination[] GetCompositeDestinations()
+ {
+ if (IsComposite)
+ {
+ LinkedList<String> list = new LinkedList<String>();
+ String[] composites = physicalName.Split(COMPOSITE_SEPARATOR.ToCharArray());
+ foreach(String composite in composites)
+ {
+ if (String.IsNullOrEmpty(composite.Trim()))
+ {
+ continue;
+ }
+ list.AddLast(composite.Trim());
+ }
+ ActiveMQDestination[] compositeDestinations = new ActiveMQDestination[list.Count];
+ int counter = 0;
+ foreach(String destination in list)
+ {
+ compositeDestinations[counter++] = CreateDestination(destination);
+ }
+
+ return compositeDestinations;
+ }
+
+ return new ActiveMQDestination[0];
+ }
/// <summary>
/// </summary>
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=1144402&r1=1144401&r2=1144402&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Fri Jul 8 17:46:16 2011
@@ -18,6 +18,7 @@
using System;
using System.Diagnostics;
using System.Collections;
+using System.Collections.Generic;
using System.Threading;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Threads;
@@ -46,6 +47,7 @@ namespace Apache.NMS.ActiveMQ
private bool dispatchAsync = true;
private int producerWindowSize = 0;
private bool messagePrioritySupported=true;
+ private bool watchTopicAdviosires = true;
private bool userSpecifiedClientID;
private readonly Uri brokerUri;
@@ -58,6 +60,7 @@ namespace Apache.NMS.ActiveMQ
private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
+ private readonly IDictionary tempDests = Hashtable.Synchronized(new Hashtable());
private readonly object connectedLock = new object();
private readonly Atomic<bool> connected = new Atomic<bool>(false);
private readonly Atomic<bool> closed = new Atomic<bool>(false);
@@ -74,9 +77,11 @@ namespace Apache.NMS.ActiveMQ
private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
private ICompressionPolicy compressionPolicy = new CompressionPolicy();
private readonly IdGenerator clientIdGenerator;
+ private int consumerIdCounter = 0;
private volatile CountDownLatch transportInterruptionProcessingComplete;
private readonly MessageTransformation messageTransformation;
private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
+ private AdvisoryConsumer advisoryConsumer = null;
public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
{
@@ -319,6 +324,12 @@ namespace Apache.NMS.ActiveMQ
set { this.dispatchAsync = value; }
}
+ public bool WatchTopicAdvisories
+ {
+ get { return this.watchTopicAdviosires; }
+ set { this.watchTopicAdviosires = value; }
+ }
+
public string ClientId
{
get { return info.ClientId; }
@@ -535,6 +546,12 @@ namespace Apache.NMS.ActiveMQ
Tracer.Info("Connection.Close(): Closing Connection Now.");
this.closing.Value = true;
+ if(this.advisoryConsumer != null)
+ {
+ this.advisoryConsumer.Dispose();
+ this.advisoryConsumer = null;
+ }
+
lock(sessions.SyncRoot)
{
foreach(Session session in sessions)
@@ -544,6 +561,11 @@ namespace Apache.NMS.ActiveMQ
}
sessions.Clear();
+ foreach(ActiveMQTempDestination dest in this.tempDests.Values)
+ {
+ dest.Delete();
+ }
+
// Connected is true only when we've successfully sent our ConnectionInfo
// to the broker, so if we haven't announced ourselves there's no need to
// inform the broker of a remove, and if the transport is failed, why bother.
@@ -733,6 +755,13 @@ namespace Apache.NMS.ActiveMQ
if(!(response is ExceptionResponse))
{
connected.Value = true;
+ if(this.watchTopicAdviosires)
+ {
+ ConsumerId id = new ConsumerId(
+ new SessionId(info.ConnectionId, -1),
+ Interlocked.Increment(ref this.consumerIdCounter));
+ this.advisoryConsumer = new AdvisoryConsumer(this, id);
+ }
}
}
}
@@ -1090,6 +1119,7 @@ namespace Apache.NMS.ActiveMQ
this.SyncRequest(command);
destination.Connection = this;
+ this.AddTempDestination(destination);
return destination;
}
@@ -1100,6 +1130,19 @@ namespace Apache.NMS.ActiveMQ
public void DeleteTemporaryDestination(IDestination destination)
{
+ CheckClosedOrFailed();
+
+ ActiveMQTempDestination temp = destination as ActiveMQTempDestination;
+
+ foreach(Session session in this.sessions)
+ {
+ if(session.IsInUse(temp))
+ {
+ throw new NMSException("A consumer is consuming from the temporary destination");
+ }
+ }
+
+ this.tempDests.Remove(destination as ActiveMQTempDestination);
this.DeleteDestination(destination);
}
@@ -1184,5 +1227,49 @@ namespace Apache.NMS.ActiveMQ
}
}
}
+
+ internal void AddTempDestination(ActiveMQTempDestination dest)
+ {
+ // .NET lacks a putIfAbsent operation for Maps.
+ lock(tempDests.SyncRoot)
+ {
+ if(!this.tempDests.Contains(dest))
+ {
+ this.tempDests.Add(dest, dest);
+ }
+ }
+ }
+
+ internal void RemoveTempDestination(ActiveMQTempDestination dest)
+ {
+ this.tempDests.Remove(dest);
+ }
+
+ internal bool IsTempDestinationActive(ActiveMQTempDestination dest)
+ {
+ if(this.advisoryConsumer == null)
+ {
+ return true;
+ }
+
+ return this.tempDests.Contains(dest);
+ }
+
+ protected void CheckClosedOrFailed()
+ {
+ CheckClosed();
+ if (transportFailed.Value)
+ {
+ throw new ConnectionFailedException(firstFailureError.Message);
+ }
+ }
+
+ protected void CheckClosed()
+ {
+ if(closed.Value)
+ {
+ throw new ConnectionClosedException();
+ }
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=1144402&r1=1144401&r2=1144402&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs Fri Jul 8 17:46:16 2011
@@ -51,6 +51,7 @@ namespace Apache.NMS.ActiveMQ
private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
private bool messagePrioritySupported=true;
+ private bool watchTopicAdvisories=true;
private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
@@ -267,6 +268,12 @@ namespace Apache.NMS.ActiveMQ
set { this.dispatchAsync = value; }
}
+ public bool WatchTopicAdvisories
+ {
+ get { return this.watchTopicAdvisories; }
+ set { this.watchTopicAdvisories = value; }
+ }
+
public bool MessagePrioritySupported
{
get { return this.messagePrioritySupported; }
@@ -410,6 +417,7 @@ namespace Apache.NMS.ActiveMQ
connection.CompressionPolicy = this.compressionPolicy.Clone() as ICompressionPolicy;
connection.ConsumerTransformer = this.consumerTransformer;
connection.ProducerTransformer = this.producerTransformer;
+ connection.WatchTopicAdvisories = this.watchTopicAdvisories;
}
protected static void ExceptionHandler(Exception ex)
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFailedException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFailedException.cs?rev=1144402&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFailedException.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFailedException.cs Fri Jul 8 17:46:16 2011
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.ActiveMQ
+{
+ /// <summary>
+ /// Exception thrown when a connection is used that it has failed in some way.
+ /// </summary>
+ [Serializable]
+ public class ConnectionFailedException : NMSException
+ {
+ public ConnectionFailedException()
+ : base("The connection has failed!")
+ {
+ }
+
+ public ConnectionFailedException(string message)
+ : base(message)
+ {
+ }
+
+ public ConnectionFailedException(string message, string errorCode)
+ : base(message, errorCode)
+ {
+ }
+
+ public ConnectionFailedException(string message, Exception innerException)
+ : base(message, innerException)
+ {
+ }
+
+ public ConnectionFailedException(string message, string errorCode, Exception innerException)
+ : base(message, errorCode, innerException)
+ {
+ }
+
+ #region ISerializable interface implementation
+
+ /// <summary>
+ /// Initializes a new instance of the ConnectionFailedException class with serialized data.
+ /// Throws System.ArgumentNullException if the info parameter is null.
+ /// Throws System.Runtime.Serialization.SerializationException if the class name is null or System.Exception.HResult is zero (0).
+ /// </summary>
+ /// <param name="info">The SerializationInfo that holds the serialized object data about the exception being thrown.</param>
+ /// <param name="context">The StreamingContext that contains contextual information about the source or destination.</param>
+ protected ConnectionFailedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
+ : base(info, context)
+ {
+ }
+
+ #endregion
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFailedException.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1144402&r1=1144401&r2=1144402&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Fri Jul 8 17:46:16 2011
@@ -74,7 +74,32 @@ namespace Apache.NMS.ActiveMQ
if(destination == null)
{
throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
- }
+ }
+ else if(destination.PhysicalName == null)
+ {
+ throw new InvalidDestinationException("The destination object was not given a physical name.");
+ }
+ else if (destination.IsTemporary)
+ {
+ String physicalName = destination.PhysicalName;
+
+ if(String.IsNullOrEmpty(physicalName))
+ {
+ throw new InvalidDestinationException("Physical name of Destination should be valid: " + destination);
+ }
+
+ String connectionID = session.Connection.ConnectionId.Value;
+
+ if(physicalName.IndexOf(connectionID) < 0)
+ {
+ throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
+ }
+
+ if(!session.Connection.IsTempDestinationActive(destination as ActiveMQTempDestination))
+ {
+ throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
+ }
+ }
this.session = session;
this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
@@ -1201,6 +1226,11 @@ namespace Apache.NMS.ActiveMQ
get { return this.session.IsClientAcknowledge; }
}
+ internal bool IsInUse(ActiveMQTempDestination dest)
+ {
+ return this.info.Destination.Equals(dest);
+ }
+
#region Nested ISyncronization Types
class MessageConsumerSynchronization : ISynchronization
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1144402&r1=1144401&r2=1144402&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs Fri Jul 8 17:46:16 2011
@@ -255,7 +255,7 @@ namespace Apache.NMS.ActiveMQ
throw new ConnectionClosedException();
}
- session.DoSend(activeMessage, this, this.usage, this.RequestTimeout);
+ session.DoSend(dest, activeMessage, this, this.usage, this.RequestTimeout);
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1144402&r1=1144401&r2=1144402&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Fri Jul 8 17:46:16 2011
@@ -655,10 +655,16 @@ namespace Apache.NMS.ActiveMQ
#endregion
- internal void DoSend( ActiveMQMessage message, MessageProducer producer, MemoryUsage producerWindow, TimeSpan sendTimeout )
+ internal void DoSend(ActiveMQDestination destination, ActiveMQMessage message,
+ MessageProducer producer, MemoryUsage producerWindow, TimeSpan sendTimeout)
{
ActiveMQMessage msg = message;
+ if(destination.IsTemporary && !connection.IsTempDestinationActive(destination as ActiveMQTempDestination))
+ {
+ throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
+ }
+
if(IsTransacted)
{
DoStartTransaction();
@@ -960,5 +966,17 @@ namespace Apache.NMS.ActiveMQ
}
}
+ internal bool IsInUse(ActiveMQTempDestination dest)
+ {
+ foreach(MessageConsumer consumer in this.consumers.Values)
+ {
+ if(consumer.IsInUse(dest))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
}
}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/AdvisorySupport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/AdvisorySupport.cs?rev=1144402&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/AdvisorySupport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/AdvisorySupport.cs Fri Jul 8 17:46:16 2011
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ
+{
+ public class AdvisorySupport
+ {
+ public static readonly String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
+ public static readonly ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Connection");
+ public static readonly ActiveMQTopic QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Queue");
+ public static readonly ActiveMQTopic TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Topic");
+ public static readonly ActiveMQTopic TEMP_QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempQueue");
+ public static readonly ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempTopic");
+ public static readonly String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer.";
+ public static readonly String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
+ public static readonly String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
+ public static readonly String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
+ public static readonly String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
+ public static readonly String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic.";
+ public static readonly String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic.";
+ public static readonly String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue.";
+ public static readonly String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
+ public static readonly String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue.";
+ public static readonly String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer.";
+ public static readonly String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastProducer.";
+ public static readonly String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded.";
+ public static readonly String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
+ public static readonly String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
+ public static readonly String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
+ public static readonly String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd.";
+ public static readonly String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
+ public static readonly String NETWORK_BRIDGE_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NetworkBridge";
+ public static readonly String AGENT_TOPIC = "ActiveMQ.Agent";
+ public static readonly String ADIVSORY_MESSAGE_TYPE = "Advisory";
+ public static readonly String MSG_PROPERTY_ORIGIN_BROKER_ID = "originBrokerId";
+ public static readonly String MSG_PROPERTY_ORIGIN_BROKER_NAME = "originBrokerName";
+ public static readonly String MSG_PROPERTY_ORIGIN_BROKER_URL = "originBrokerURL";
+ public static readonly String MSG_PROPERTY_USAGE_NAME = "usageName";
+ public static readonly String MSG_PROPERTY_CONSUMER_ID = "consumerId";
+ public static readonly String MSG_PROPERTY_PRODUCER_ID = "producerId";
+ public static readonly String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId";
+ public static readonly String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount";
+ public static readonly String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount";
+
+ public static readonly ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
+ TEMP_QUEUE_ADVISORY_TOPIC.PhysicalName + "," + TEMP_TOPIC_ADVISORY_TOPIC.PhysicalName);
+
+ private AdvisorySupport()
+ {
+ }
+
+ public static ActiveMQTopic GetConnectionAdvisoryTopic()
+ {
+ return CONNECTION_ADVISORY_TOPIC;
+ }
+
+ public static ActiveMQTopic GetConsumerAdvisoryTopic(IDestination destination)
+ {
+ return GetConsumerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static ActiveMQTopic GetConsumerAdvisoryTopic(ActiveMQDestination destination)
+ {
+ if (destination.IsQueue)
+ {
+ return new ActiveMQTopic(QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.PhysicalName);
+ }
+ else
+ {
+ return new ActiveMQTopic(TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.PhysicalName);
+ }
+ }
+
+ public static ActiveMQTopic GetProducerAdvisoryTopic(IDestination destination)
+ {
+ return GetProducerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static ActiveMQTopic GetProducerAdvisoryTopic(ActiveMQDestination destination)
+ {
+ if (destination.IsQueue)
+ {
+ return new ActiveMQTopic(QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.PhysicalName);
+ }
+ else
+ {
+ return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.PhysicalName);
+ }
+ }
+
+ public static ActiveMQTopic GetExpiredMessageTopic(IDestination destination)
+ {
+ return GetExpiredMessageTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static ActiveMQTopic GetExpiredMessageTopic(ActiveMQDestination destination)
+ {
+ if (destination.IsQueue)
+ {
+ return GetExpiredQueueMessageAdvisoryTopic(destination);
+ }
+ return GetExpiredTopicMessageAdvisoryTopic(destination);
+ }
+
+ public static ActiveMQTopic GetExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination)
+ {
+ String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX + destination.PhysicalName;
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic GetExpiredQueueMessageAdvisoryTopic(IDestination destination)
+ {
+ return GetExpiredQueueMessageAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static ActiveMQTopic GetExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination)
+ {
+ String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + destination.PhysicalName;
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic GetNoTopicConsumersAdvisoryTopic(IDestination destination)
+ {
+ return GetNoTopicConsumersAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static ActiveMQTopic GetNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination)
+ {
+ String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX + destination.PhysicalName;
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic GetNoQueueConsumersAdvisoryTopic(IDestination destination)
+ {
+ return GetNoQueueConsumersAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static ActiveMQTopic GetNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination)
+ {
+ String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX + destination.PhysicalName;
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic GetSlowConsumerAdvisoryTopic(IDestination destination)
+ {
+ return GetSlowConsumerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static ActiveMQTopic GetSlowConsumerAdvisoryTopic(ActiveMQDestination destination)
+ {
+ String name = SLOW_CONSUMER_TOPIC_PREFIX + destination.GetDestinationTypeAsString() + "."
+ + destination.PhysicalName;
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic GetFastProducerAdvisoryTopic(IDestination destination)
+ {
+ return GetFastProducerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static ActiveMQTopic GetFastProducerAdvisoryTopic(ActiveMQDestination destination)
+ {
+ String name = FAST_PRODUCER_TOPIC_PREFIX + destination.GetDestinationTypeAsString() + "."
+ + destination.PhysicalName;
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic GetMessageDiscardedAdvisoryTopic(IDestination destination)
+ {
+ return GetMessageDiscardedAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static ActiveMQTopic GetMessageDiscardedAdvisoryTopic(ActiveMQDestination destination)
+ {
+ String name = MESSAGE_DISCAREDED_TOPIC_PREFIX + destination.GetDestinationTypeAsString() + "."
+ + destination.PhysicalName;
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic GetMessageDeliveredAdvisoryTopic(IDestination destination)
+ {
+ return GetMessageDeliveredAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static ActiveMQTopic GetMessageDeliveredAdvisoryTopic(ActiveMQDestination destination)
+ {
+ String name = MESSAGE_DELIVERED_TOPIC_PREFIX + destination.GetDestinationTypeAsString() + "."
+ + destination.PhysicalName;
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic GetMessageConsumedAdvisoryTopic(IDestination destination)
+ {
+ return GetMessageConsumedAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static ActiveMQTopic GetMessageConsumedAdvisoryTopic(ActiveMQDestination destination)
+ {
+ String name = MESSAGE_CONSUMED_TOPIC_PREFIX + destination.GetDestinationTypeAsString() + "."
+ + destination.PhysicalName;
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic GetMessageDLQdAdvisoryTopic(ActiveMQDestination destination)
+ {
+ String name = MESSAGE_DLQ_TOPIC_PREFIX + destination.GetDestinationTypeAsString() + "."
+ + destination.PhysicalName;
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic GetMasterBrokerAdvisoryTopic(IDestination destination)
+ {
+ return GetMasterBrokerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static ActiveMQTopic GetMasterBrokerAdvisoryTopic()
+ {
+ return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
+ }
+
+ public static ActiveMQTopic GetNetworkBridgeAdvisoryTopic()
+ {
+ return new ActiveMQTopic(NETWORK_BRIDGE_TOPIC_PREFIX);
+ }
+
+ public static ActiveMQTopic GetFullAdvisoryTopic(IDestination destination)
+ {
+ return GetFullAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static ActiveMQTopic GetFullAdvisoryTopic(ActiveMQDestination destination)
+ {
+ String name = FULL_TOPIC_PREFIX + destination.GetDestinationTypeAsString() + "."
+ + destination.PhysicalName;
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic GetDestinationAdvisoryTopic(IDestination destination)
+ {
+ return GetDestinationAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static ActiveMQTopic GetDestinationAdvisoryTopic(ActiveMQDestination destination)
+ {
+ switch (destination.GetDestinationType())
+ {
+ case ActiveMQDestination.ACTIVEMQ_QUEUE:
+ return QUEUE_ADVISORY_TOPIC;
+ case ActiveMQDestination.ACTIVEMQ_TOPIC:
+ return TOPIC_ADVISORY_TOPIC;
+ case ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE:
+ return TEMP_QUEUE_ADVISORY_TOPIC;
+ case ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC:
+ return TEMP_TOPIC_ADVISORY_TOPIC;
+ default:
+ throw new NMSException("Unknown destination type: " + destination.DestinationType);
+ }
+ }
+
+ public static bool IsDestinationAdvisoryTopic(IDestination destination)
+ {
+ return IsDestinationAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static bool IsDestinationAdvisoryTopic(ActiveMQDestination destination)
+ {
+ if (destination.IsComposite)
+ {
+ ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.Length; i++)
+ {
+ if (IsDestinationAdvisoryTopic(compositeDestinations[i]))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ else
+ {
+ return destination.Equals(TEMP_QUEUE_ADVISORY_TOPIC) ||
+ destination.Equals(TEMP_TOPIC_ADVISORY_TOPIC) ||
+ destination.Equals(QUEUE_ADVISORY_TOPIC) ||
+ destination.Equals(TOPIC_ADVISORY_TOPIC);
+ }
+ }
+
+ public static bool IsAdvisoryTopic(IDestination destination)
+ {
+ return IsAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static bool IsAdvisoryTopic(ActiveMQDestination destination)
+ {
+ if (destination.IsComposite)
+ {
+ ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.Length; i++)
+ {
+ if (IsAdvisoryTopic(compositeDestinations[i]))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ else
+ {
+ return destination.IsTopic && destination.PhysicalName.StartsWith(ADVISORY_TOPIC_PREFIX);
+ }
+ }
+
+ public static bool IsConnectionAdvisoryTopic(IDestination destination)
+ {
+ return IsConnectionAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static bool IsConnectionAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.IsComposite)
+ {
+ ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.Length; i++)
+ {
+ if (IsConnectionAdvisoryTopic(compositeDestinations[i]))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ else
+ {
+ return destination.Equals(CONNECTION_ADVISORY_TOPIC);
+ }
+ }
+
+ public static bool IsProducerAdvisoryTopic(IDestination destination)
+ {
+ return IsProducerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static bool IsProducerAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.IsComposite)
+ {
+ ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.Length; i++)
+ {
+ if (IsProducerAdvisoryTopic(compositeDestinations[i]))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ else
+ {
+ return destination.IsTopic && destination.PhysicalName.StartsWith(PRODUCER_ADVISORY_TOPIC_PREFIX);
+ }
+ }
+
+ public static bool IsConsumerAdvisoryTopic(IDestination destination)
+ {
+ return IsConsumerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static bool IsConsumerAdvisoryTopic(ActiveMQDestination destination)
+ {
+ if (destination.IsComposite)
+ {
+ ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.Length; i++)
+ {
+ if (IsConsumerAdvisoryTopic(compositeDestinations[i]))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ else
+ {
+ return destination.IsTopic && destination.PhysicalName.StartsWith(CONSUMER_ADVISORY_TOPIC_PREFIX);
+ }
+ }
+
+ public static bool IsSlowConsumerAdvisoryTopic(IDestination destination)
+ {
+ return IsSlowConsumerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static bool IsSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.IsComposite)
+ {
+ ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.Length; i++)
+ {
+ if (IsSlowConsumerAdvisoryTopic(compositeDestinations[i]))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ else
+ {
+ return destination.IsTopic && destination.PhysicalName.StartsWith(SLOW_CONSUMER_TOPIC_PREFIX);
+ }
+ }
+
+ public static bool IsFastProducerAdvisoryTopic(IDestination destination)
+ {
+ return IsFastProducerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static bool IsFastProducerAdvisoryTopic(ActiveMQDestination destination)
+ {
+ if (destination.IsComposite)
+ {
+ ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.Length; i++)
+ {
+ if (IsFastProducerAdvisoryTopic(compositeDestinations[i]))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ else
+ {
+ return destination.IsTopic && destination.PhysicalName.StartsWith(FAST_PRODUCER_TOPIC_PREFIX);
+ }
+ }
+
+ public static bool IsMessageConsumedAdvisoryTopic(IDestination destination)
+ {
+ return IsMessageConsumedAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static bool IsMessageConsumedAdvisoryTopic(ActiveMQDestination destination)
+ {
+ if (destination.IsComposite)
+ {
+ ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.Length; i++)
+ {
+ if (IsMessageConsumedAdvisoryTopic(compositeDestinations[i]))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ else
+ {
+ return destination.IsTopic && destination.PhysicalName.StartsWith(MESSAGE_CONSUMED_TOPIC_PREFIX);
+ }
+ }
+
+ public static bool IsMasterBrokerAdvisoryTopic(IDestination destination)
+ {
+ return IsMasterBrokerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static bool IsMasterBrokerAdvisoryTopic(ActiveMQDestination destination)
+ {
+ if (destination.IsComposite)
+ {
+ ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.Length; i++)
+ {
+ if (IsMasterBrokerAdvisoryTopic(compositeDestinations[i]))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ else
+ {
+ return destination.IsTopic && destination.PhysicalName.StartsWith(MASTER_BROKER_TOPIC_PREFIX);
+ }
+ }
+
+ public static bool IsMessageDeliveredAdvisoryTopic(IDestination destination)
+ {
+ return IsMessageDeliveredAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static bool IsMessageDeliveredAdvisoryTopic(ActiveMQDestination destination)
+ {
+ if (destination.IsComposite)
+ {
+ ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.Length; i++)
+ {
+ if (IsMessageDeliveredAdvisoryTopic(compositeDestinations[i]))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ else
+ {
+ return destination.IsTopic && destination.PhysicalName.StartsWith(MESSAGE_DELIVERED_TOPIC_PREFIX);
+ }
+ }
+
+ public static bool IsMessageDiscardedAdvisoryTopic(IDestination destination)
+ {
+ return IsMessageDiscardedAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static bool IsMessageDiscardedAdvisoryTopic(ActiveMQDestination destination)
+ {
+ if (destination.IsComposite)
+ {
+ ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.Length; i++)
+ {
+ if (IsMessageDiscardedAdvisoryTopic(compositeDestinations[i]))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ else
+ {
+ return destination.IsTopic && destination.PhysicalName.StartsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX);
+ }
+ }
+
+ public static bool IsFullAdvisoryTopic(IDestination destination)
+ {
+ return IsFullAdvisoryTopic(ActiveMQDestination.Transform(destination));
+ }
+
+ public static bool IsFullAdvisoryTopic(ActiveMQDestination destination)
+ {
+ if (destination.IsComposite)
+ {
+ ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.Length; i++)
+ {
+ if (IsFullAdvisoryTopic(compositeDestinations[i]))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ else
+ {
+ return destination.IsTopic && destination.PhysicalName.StartsWith(FULL_TOPIC_PREFIX);
+ }
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/AdvisorySupport.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/TempDestinationTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/TempDestinationTest.cs?rev=1144402&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/TempDestinationTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/TempDestinationTest.cs Fri Jul 8 17:46:16 2011
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Collections.Specialized;
+using System.Threading;
+using Apache.NMS;
+using Apache.NMS.Test;
+using Apache.NMS.Util;
+using Apache.NMS.Policies;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+ [TestFixture]
+ public class TempDestinationTest : NMSTestSupport
+ {
+ private Connection connection;
+ private readonly IList connections = ArrayList.Synchronized(new ArrayList());
+
+ [SetUp]
+ public override void SetUp()
+ {
+ base.SetUp();
+
+ connection = this.CreateConnection() as Connection;
+ connections.Add(connection);
+ }
+
+ [TearDown]
+ public override void TearDown()
+ {
+ foreach(Connection connection in connections)
+ {
+ try
+ {
+ connection.Close();
+ }
+ catch
+ {
+ }
+ }
+
+ connections.Clear();
+
+ base.TearDown();
+ }
+
+ /// <summary>
+ /// Make sure Temp destination can only be consumed by local connection
+ /// </summary>
+ [Test]
+ public void TestTempDestOnlyConsumedByLocalConn()
+ {
+ connection.Start();
+
+ ISession tempSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ ITemporaryQueue queue = tempSession.CreateTemporaryQueue();
+ IMessageProducer producer = tempSession.CreateProducer(queue);
+ producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+ ITextMessage message = tempSession.CreateTextMessage("First");
+ producer.Send(message);
+
+ // temp destination should not be consume when using another connection
+ Connection otherConnection = CreateConnection() as Connection;
+ connections.Add(otherConnection);
+ ISession otherSession = otherConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ ITemporaryQueue otherQueue = otherSession.CreateTemporaryQueue();
+ IMessageConsumer consumer = otherSession.CreateConsumer(otherQueue);
+ IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+ Assert.IsNull(msg);
+
+ // should throw InvalidDestinationException when consuming a temp
+ // destination from another connection
+ try
+ {
+ consumer = otherSession.CreateConsumer(queue);
+ Assert.Fail("Send should fail since temp destination should be used from another connection");
+ }
+ catch(InvalidDestinationException)
+ {
+ Assert.IsTrue(true, "failed to throw an exception");
+ }
+
+ // should be able to consume temp destination from the same connection
+ consumer = tempSession.CreateConsumer(queue);
+ msg = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+ Assert.NotNull(msg);
+ }
+
+ /// <summary>
+ /// Make sure that a temp queue does not drop message if there is an active consumers.
+ /// </summary>
+ [Test]
+ public void TestTempQueueHoldsMessagesWithConsumers()
+ {
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.CreateTemporaryQueue();
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+ connection.Start();
+
+ IMessageProducer producer = session.CreateProducer(queue);
+ producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+ ITextMessage message = session.CreateTextMessage("Hello");
+ producer.Send(message);
+
+ IMessage message2 = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(message2);
+ Assert.IsTrue(message2 is ITextMessage, "Expected message to be a TextMessage");
+ Assert.IsTrue(((ITextMessage)message2).Text.Equals(message.Text),
+ "Expected message to be a '" + message.Text + "'");
+ }
+
+ /// <summary>
+ /// Make sure that a temp queue does not drop message if there are no active consumers.
+ /// </summary>
+ [Test]
+ public void TestTempQueueHoldsMessagesWithoutConsumers()
+ {
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.CreateTemporaryQueue();
+ IMessageProducer producer = session.CreateProducer(queue);
+ producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+ ITextMessage message = session.CreateTextMessage("Hello");
+ producer.Send(message);
+
+ connection.Start();
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+ IMessage message2 = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+ Assert.IsNotNull(message2);
+ Assert.IsTrue(message2 is ITextMessage, "Expected message to be a TextMessage");
+ Assert.IsTrue(((ITextMessage)message2).Text.Equals(message.Text),
+ "Expected message to be a '" + message.Text + "'");
+
+ }
+
+ /// <summary>
+ /// Test temp queue works under load
+ /// </summary>
+ [Test]
+ public void TestTmpQueueWorksUnderLoad()
+ {
+ int count = 500;
+ int dataSize = 1024;
+
+ ArrayList list = new ArrayList(count);
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.CreateTemporaryQueue();
+ IMessageProducer producer = session.CreateProducer(queue);
+ producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+
+ byte[] data = new byte[dataSize];
+ for (int i = 0; i < count; i++)
+ {
+ IBytesMessage message = session.CreateBytesMessage();
+ message.WriteBytes(data);
+ message.Properties.SetInt("c", i);
+ producer.Send(message);
+ list.Add(message);
+ }
+
+ connection.Start();
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+ for (int i = 0; i < count; i++)
+ {
+ IMessage message2 = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+ Assert.IsTrue(message2 != null);
+ Assert.AreEqual(i, message2.Properties.GetInt("c"));
+ Assert.IsTrue(message2.Equals(list[i]));
+ }
+ }
+
+ /// <summary>
+ /// Make sure you cannot publish to a temp destination that does not exist anymore.
+ /// </summary>
+ [Test]
+ public void TestPublishFailsForClosedConnection()
+ {
+ Connection tempConnection = CreateConnection() as Connection;
+ connections.Add(tempConnection);
+ ISession tempSession = tempConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ ITemporaryQueue queue = tempSession.CreateTemporaryQueue();
+
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ connection.Start();
+
+ // This message delivery should work since the temp connection is still
+ // open.
+ IMessageProducer producer = session.CreateProducer(queue);
+ producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+ ITextMessage message = session.CreateTextMessage("First");
+ producer.Send(message);
+ Thread.Sleep(1000);
+
+ // Closing the connection should destroy the temp queue that was
+ // created.
+ tempConnection.Close();
+ Thread.Sleep(5000); // Wait a little bit to let the delete take effect.
+
+ // This message delivery NOT should work since the temp connection is
+ // now closed.
+ try
+ {
+ message = session.CreateTextMessage("Hello");
+ producer.Send(message);
+ Assert.Fail("Send should fail since temp destination should not exist anymore.");
+ }
+ catch(NMSException e)
+ {
+ Tracer.Debug("Test threw expected exception: " + e.Message);
+ }
+ }
+
+ /// <summary>
+ /// Make sure you cannot publish to a temp destination that does not exist anymore.
+ /// </summary>
+ [Test]
+ public void TestPublishFailsForDestoryedTempDestination()
+ {
+ Connection tempConnection = CreateConnection() as Connection;
+ connections.Add(tempConnection);
+ ISession tempSession = tempConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ ITemporaryQueue queue = tempSession.CreateTemporaryQueue();
+
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ connection.Start();
+
+ // This message delivery should work since the temp connection is still
+ // open.
+ IMessageProducer producer = session.CreateProducer(queue);
+ producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+ ITextMessage message = session.CreateTextMessage("First");
+ producer.Send(message);
+ Thread.Sleep(1000);
+
+ // deleting the Queue will cause sends to fail
+ queue.Delete();
+ Thread.Sleep(5000); // Wait a little bit to let the delete take effect.
+
+ // This message delivery NOT should work since the temp connection is
+ // now closed.
+ try
+ {
+ message = session.CreateTextMessage("Hello");
+ producer.Send(message);
+ Assert.Fail("Send should fail since temp destination should not exist anymore.");
+ }
+ catch(NMSException e)
+ {
+ Tracer.Debug("Test threw expected exception: " + e.Message);
+ Assert.IsTrue(true, "failed to throw an exception");
+ }
+ }
+
+ /// <summary>
+ /// Test you can't delete a Destination with Active Subscribers
+ /// </summary>
+ [Test]
+ public void TestDeleteDestinationWithSubscribersFails()
+ {
+ Connection connection = CreateConnection() as Connection;
+ connections.Add(connection);
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ ITemporaryQueue queue = session.CreateTemporaryQueue();
+
+ connection.Start();
+
+ session.CreateConsumer(queue);
+
+ try
+ {
+ queue.Delete();
+ Assert.Fail("Should fail as Subscribers are active");
+ }
+ catch(NMSException)
+ {
+ Assert.IsTrue(true, "failed to throw an exception");
+ }
+ }
+
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/TempDestinationTest.cs
------------------------------------------------------------------------------
svn:eol-style = native