You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Subasish Mohapatra <su...@optym.com> on 2022/06/22 12:20:22 UTC

How to enable multiple consumers for Apache.NMS.AMQP clients?

Hi

Would like to know how to enable multiple consumers for Apache.NMS.AMQP client. Have tried with multiple sessions for same queue with different consumer - but the listener is only getting called for one consumer per queue. Below is the sample code. Ignore the connection per queue as I thought that might be the cause but doesn't work.
With the below sample code, if I try sending multiple messages for queue “Q/test”, only one consumer instance is getting called.

var queueClientLogger = loggerFactory.CreateLogger<QueueClient>();
var queueClient1 = new QueueClient(queueClientLogger, "Q/test1");
await queueClient1.InitializeAsync();

var queueClient2 = new QueueClient(queueClientLogger, "Q/test1");
await queueClient2.InitializeAsync();

var queueClient3 = new QueueClient(queueClientLogger, "Q/test2");
await queueClient3.InitializeAsync();

-----------------------------------------------
internal class QueueClient : IDisposable
{
  private readonly ILogger<QueueClient> logger;
  private IMessageConsumer consumer;
  private bool disposedValue;

  #region constructor

  public QueueClient(ILogger<QueueClient> logger, string queueName)
  {
      this.logger = logger;
      QueueName = queueName;
      ConsumerName = $"{QueueName}-{Guid.NewGuid()}";
  }

  #endregion

  #region Properties

  internal string? QueueName { get; private set; }
  internal string ConsumerName { get; private set; }
  internal Apache.NMS.ISession Session { get; private set; }
  internal Apache.NMS.IConnection Connection { get; private set; }


  #endregion

  #region Methods

  internal async Task InitializeAsync()
  {
      string brokerUri = $"amqp://localhost:5672";  // Default port
      NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
      Connection = await factory.CreateConnectionAsync();
      await Connection.StartAsync();
      Session = await Connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
      Apache.NMS.IDestination dest = await Session.GetQueueAsync(QueueName);
      consumer = await Session.CreateConsumerAsync(dest);
      consumer.Listener += Consumer_Listener;
  }

  private void Consumer_Listener(Apache.NMS.IMessage message)
  {
      logger.LogInformation($"{ConsumerName}: Message from queue - {QueueName}");
      Thread.Sleep(1000);
      string content = string.Empty;
      if (message is ITextMessage)
      {
          ITextMessage? txtMsg = message as ITextMessage;
          content = txtMsg?.Text ?? "";
      }
      else if (message is IBytesMessage)
      {
          IBytesMessage? bytesMsg = message as IBytesMessage;
          if (bytesMsg == null)
          {
              content = $"NULL message received";
          }
          else
          {
              content = Encoding.UTF8.GetString(bytesMsg.Content);
          }
      }
      else
      {
          content = "Unexpected message type: " + message.GetType().Name;
      }
      logger.LogInformation($"{content}");
  }

  //Ignore IDosposable code
}


Regards,
Subasish Mohapatra
Senior Software Architect | Optym 
15th Floor, Tower B, Prestige Shantiniketan, Whitefield Main Road, Bangalore – 560048
M: +91.900.881.1448 | subasish.mohapatra@optym.com <ma...@optym.com>
www.optym.com<https://nam02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.optym.com%2F&data=04%7C01%7Cravi.ahuja%40optym.com%7C2636548a9f75483d836908d8754f4b17%7Cfe2cf47216154350ae3921441d02fbf1%7C0%7C0%7C637388330706635815%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=d4lmf2LCgPnrOTdC874uIZWXf2JOnOaLQE6wv4r4QI0%3D&reserved=0> 
 
[cid:image001.png@01D88660.8998BB20]
 
The email message (including any attachments) contains information that may be confidential, protected by applicable legal privileges, or contain non-public information. It is intended to be conveyed only to the designated recipient(s). If you are not an intended recipient of this message, please notify the sender by replying to this message and then delete it from your system. Use, dissemination, distribution or reproduction of this message by unintended recipients is not authorized and may be unlawful.