You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Svetlana Undalova (Jira)" <ji...@apache.org> on 2021/02/09 10:06:00 UTC

[jira] [Created] (ARTEMIS-3110) PullMessage timeout is ignored

Svetlana Undalova created ARTEMIS-3110:
------------------------------------------

             Summary: PullMessage timeout is ignored
                 Key: ARTEMIS-3110
                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3110
             Project: ActiveMQ Artemis
          Issue Type: Bug
    Affects Versions: 2.16.0
            Reporter: Svetlana Undalova


I ran into an issue trying to use Apache Artemis (2.16).

I have slow consumers and went with prefetch size = 0 which according to docs results in pulling messages by the client. All works well if consumer starts after messages are already in the queue, but if it becomes idle after a while and manages to call consumer.Receive() N times then next time when messages appear in the queue I can see that all N are sent to the client (via checking UnconsumedMessageCount)

I checked the same set up with ActiveMQ (5.16) and there both scenarios work correctly. I think that issue may happen because Artemis broker ignores PullMessage timeout.

Here is my code

 
{code:java}
static async Task Main(string[] args)
{
    var connectionfactory = new Apache.NMS.ActiveMQ.ConnectionFactory("tcp://localhost:61616?connection.PrefetchPolicy.queuePrefetch=0");
    
    var slowConsumer = ConsumerProcess("slow consumer", TimeSpan.FromSeconds(120), connectionfactory);
    var fastConsumer = ConsumerProcess("fast consumer", TimeSpan.FromSeconds(10), connectionfactory);
    
    // To make sure several pull requests are sent
    await Task.Delay(TimeSpan.FromSeconds(5));
    
    // Adding messages
    using (var connection = connectionfactory.CreateConnection("user", "password"))         {
         connection.Start();
         using (var session = connection.CreateSession(acknowledgementMode: Apache.NMS.AcknowledgementMode.AutoAcknowledge))
         {
             var destination = Apache.NMS.Util.SessionUtil.GetDestination(session, "queue://cookies");
             using (var producer = session.CreateProducer(destination))                             {
                 for (int i = 0; i < 10; i++)
                 {
                      var message = producer.CreateTextMessage();                                            message.Text = $"{i}_{DateTimeOffset.Now}_{Guid.NewGuid()}";
                      Console.WriteLine($"{DateTimeOffset.Now}\tSending message {message.Text}");
                      producer.Send(message);
                  }
              }
          }
     }
     Console.WriteLine("Press enter to exit");
     Console.ReadLine();
}


private static async Task ConsumerProcess(string name, TimeSpan processingTime, Apache.NMS.IConnectionFactory connectionfactory)
{
     using (var connection = connectionfactory.CreateConnection("user", "password"))       {
         connection.Start();
         using (var session = connection.CreateSession(acknowledgementMode: Apache.NMS.AcknowledgementMode.IndividualAcknowledge))
         {
              var destination = Apache.NMS.Util.SessionUtil.GetDestination(session, "queue://cookies");
              using (var consumer = session.CreateConsumer(destination))                             {
                   while (true)
                   {
                        var message = consumer.Receive(TimeSpan.FromSeconds(1));
                        if (message is Apache.NMS.ITextMessage textMessage)    
                        {
                             Console.WriteLine($"{DateTimeOffset.Now}\t{name} received message - {textMessage.Text}");
                             Console.WriteLine($"{DateTimeOffset.Now}\t{name} unconsumed messages after receive - {((Apache.NMS.ActiveMQ.MessageConsumer)consumer).UnconsumedMessageCount}");                                         await Task.Delay(processingTime);                           
                             message.Acknowledge();
                         }
                         else
                         {
                              await Task.Delay(TimeSpan.FromSeconds(1));
                         }
                    }
               }
          }
     }
}

{code}
 

 

Here is output
{code:java}
05.02.2021 16:04:55 +01:00      Sending message 5_05.02.2021 16:04:55 +01:00_97a6857b-d003-4c63-ba11-16a6825e6595
05.02.2021 16:04:55 +01:00      Sending message 6_05.02.2021 16:04:55 +01:00_4581b335-2483-42ac-a9c5-fe530a870ee1
05.02.2021 16:04:55 +01:00      Sending message 7_05.02.2021 16:04:55 +01:00_2ad2c9d4-aab6-47be-b51c-a1faacd6a03c
05.02.2021 16:04:55 +01:00      Sending message 8_05.02.2021 16:04:55 +01:00_866bca6b-9237-491b-981f-d73ae7c025fb
05.02.2021 16:04:55 +01:00      Sending message 9_05.02.2021 16:04:55 +01:00_a60b926c-96b2-4989-8f5f-ec85e5cfb071
Press enter to exit
05.02.2021 16:04:56 +01:00      slow consumer received message - 0_05.02.2021 16:04:55 +01:00_ba96ec80-3008-4c95-987c-5f82a1aaa239
05.02.2021 16:04:56 +01:00      slow consumer unconsumed messages after receive - 3
05.02.2021 16:05:06 +01:00      fast consumer received message - 3_05.02.2021 16:04:55 +01:00_34df251c-da75-4145-8da4-efeecea3318d
05.02.2021 16:05:06 +01:00      fast consumer unconsumed messages after receive - 2
05.02.2021 16:05:17 +01:00      fast consumer received message - 5_05.02.2021 16:04:55 +01:00_97a6857b-d003-4c63-ba11-16a6825e6595
05.02.2021 16:05:17 +01:00      fast consumer unconsumed messages after receive - 1
05.02.2021 16:05:28 +01:00      fast consumer received message - 7_05.02.2021 16:04:55 +01:00_2ad2c9d4-aab6-47be-b51c-a1faacd6a03c
05.02.2021 16:05:28 +01:00      fast consumer unconsumed messages after receive - 0
05.02.2021 16:05:39 +01:00      fast consumer received message - 8_05.02.2021 16:04:55 +01:00_866bca6b-9237-491b-981f-d73ae7c025fb
05.02.2021 16:05:39 +01:00      fast consumer unconsumed messages after receive - 0
05.02.2021 16:05:50 +01:00      fast consumer received message - 9_05.02.2021 16:04:55 +01:00_a60b926c-96b2-4989-8f5f-ec85e5cfb071
05.02.2021 16:05:50 +01:00      fast consumer unconsumed messages after receive - 0
05.02.2021 16:06:57 +01:00      slow consumer received message - 2_05.02.2021 16:04:55 +01:00_86962504-1b4b-4f2e-b359-585e535042e9
05.02.2021 16:06:57 +01:00      slow consumer unconsumed messages after receive - 2
05.02.2021 16:08:58 +01:00      slow consumer received message - 4_05.02.2021 16:04:55 +01:00_2d7ce61f-0240-4278-a779-936fd4f3a3e7
05.02.2021 16:08:58 +01:00      slow consumer unconsumed messages after receive - 1
05.02.2021 16:10:59 +01:00      slow consumer received message - 6_05.02.2021 16:04:55 +01:00_4581b335-2483-42ac-a9c5-fe530a870ee1
05.02.2021 16:10:59 +01:00      slow consumer unconsumed messages after receive - 0
 
{code}

As you may see consumers received more than 1 message (according to unconsumed message count  which is amount of messages that were sent to the client). I was playing with it and usually amount of messages received are equals to amount of Receive() calls I made.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)