You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by neeraj kaushik <ne...@gmail.com> on 2009/04/06 09:25:12 UTC

Get Cosumers of queues in ActiveMQ using NMS

Advisory Topics of ActiveMQ implementation using NMS (Dot net api for
ActiveMQ) in C# application

http://activemq.apache.org/

This document highlights to get consumers, producers list attached to
activemq using NMS (dot net Api for connectivity). Unlike JMS, NMS doesn't
provide such information so I found some workaround for this. If system
knows about Consumers died and active then it can avoid unnecessary message
sending to dead consumers.

First of All I give you idea of application in which I need it.

There is an request response architecture in which client sends request to
server through activemq Queues and keep this request in list for sending
responses to these queues, but if any client goes down then this server
component doesn't know about dead consumers and try sending responses to
this consumers and activemq accumulated messages without consumers, which
sometimes halt activemq if buffers gets full.

So to accomplish this task I made changes in server component so that  it
becomes consumer for advisory topics
(http://activemq.apache.org/advisory-message.html). Whenever any consumer
attached to activemq, server component get notified with "ConsumerInfo" type
of message and if any consumer dies then it gets notified "RemoveInfo" type
of message.

Below is code snippet how I completed this functionality:

//Create Connection

public class AdvisorySupport : IAdvisorySupport,IDisposable

{

private IConnection _connection;

private ISession session;

public event RemovedConsumersHandler RemovedConsumer;

Dictionary _listofConsumers = new Dictionary();

public Dictionary ListofActiveConsumers

{

get { return _listofConsumers; }

set { _listofConsumers = value; }

}

 

public AdvisorySupport()

{

CreateNMSFactory();

CreateConnection();

}

private bool CreateNMSFactory()

{

connectionFactory = new ConnectionFactory(URI);

return (null != connectionFactory);

}

private bool CreateConnection()

{

_connection= connectionFactory.CreateConnection() :

_connection.Start();

SubscribeAdvisoryTopics();

return true;

}

internal bool CreateSession()

{

if (session == null)

session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge);

return session != null;

}

public void SubscribeAdvisoryTopics()

{ 

// ////Queue create & destroy

// Subscribe("ActiveMQ.Advisory.Queue");

// ////Expired messages on a Queue

// ////String='orignalMessageId' - the expired id 

//Subscribe("ActiveMQ.Advisory.Expired.Queue");

// //No consumer is available to process messages being sent on a Queue

// Subscribe("ActiveMQ.Advisory.NoConsumer.Queue");

// ////Message consumed by a client : String='orignalMessageId' - the
delivered id 

// Subscribe("ActiveMQ.Advisory.MessageConsumed.Queue");

// ////ActiveMQ.Advisory.Consumer.Queue Consumer start & stop messages on a
Queue String='consumerCount' - the number of Consumers ConsumerInfo 

// Subscribe("ActiveMQ.Advisory.Consumer..>");

if (CreateSession())

Subscribe("ActiveMQ.Advisory.Consumer.Queue.>");

}

private void Subscribe(string topicname)

{

ITopic topicForListenQueue = session.GetTopic(topicname);

IMessageConsumer consumerForListenQueue =
session.CreateConsumer(topicForListenQueue);

consumerForListenQueue.Listener += new
MessageListener(consumerForListenQueue_Listener);

}

void consumerForListenQueue_Listener(IMessage message)

{

if (message == null)

throw new Connamara.ActiveMQPublisher.ActiveMQPublisherException("got null
msg");

ActiveMQMessage aMsg = (ActiveMQMessage)message;

if (((Apache.NMS.ActiveMQ.Commands.Message)(aMsg)).DataStructure is
ConsumerInfo)

{

ConsumerInfo info =
(ConsumerInfo)((Apache.NMS.ActiveMQ.Commands.Message)(aMsg)).DataStructure;

if (!_listofConsumers.ContainsKey(info.ConsumerId.ToString()))

_listofConsumers[info.ConsumerId.ToString()] = info;

}

else if (((Apache.NMS.ActiveMQ.Commands.Message)(aMsg)).DataStructure is
RemoveInfo)

{

RemoveInfo info =
(RemoveInfo)((Apache.NMS.ActiveMQ.Commands.Message)(aMsg)).DataStructure;

if (_listofConsumers.ContainsKey(info.ObjectId.ToString()))

_listofConsumers.Remove(info.ObjectId.ToString());

 

if (RemovedConsumer != null)

RemovedConsumer(info.ObjectId.ToString());

}

}

#region IDisposable Members

public void Dispose()

{

if (session != null)

session.Close();

}

#endregion

}


-- 
View this message in context: http://www.nabble.com/Get-Cosumers-of-queues-in-ActiveMQ-using-NMS-tp22903480p22903480.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.