You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "daves (Jira)" <ji...@apache.org> on 2022/10/13 10:27:00 UTC
[jira] [Created] (ARTEMIS-4047) Artemis does not send message to consumer AMQP
daves created ARTEMIS-4047:
------------------------------
Summary: Artemis does not send message to consumer AMQP
Key: ARTEMIS-4047
URL: https://issues.apache.org/jira/browse/ARTEMIS-4047
Project: ActiveMQ Artemis
Issue Type: Bug
Components: AMQP, Broker
Affects Versions: 2.25.0
Reporter: daves
Attachments: 1.PNG, 2.PNG, 3.PNG, 4.PNG, 5.PNG, All.zip
The broker does not send messages from one of many existing queues to the connected consumer.
According to the UI the queue does contain ~15k messages.
I’m not able to consume any of these messages. I also tried to read a message using the browse function of the UI/console but that does not work eighter.
The message was created by a AMQP client and should be consumed by another AMQP client.
I tried to capture the situation in a few screenshots…
I don’t know which data can help you to understand the situation, so I’ve collected everything:
* Logs
* Broker
* Data
Please let me know if there are any other data I should add to the ticket.
I don’t think that the code of my client is relevant since the problem only exist for a single queue…but here it is anyway:
{code:java}
using Amqp;
using Amqp.Framing;
using Amqp.Types;
namespace Test;
public sealed class MessageConsumer
{
private readonly String _address;
private readonly CancellationToken _cancellationToken;
private readonly String _consumerName;
private readonly String[] _destinations;
public MessageConsumer( String address, String consumerName, String[] destinations, CancellationToken cancellationToken )
{
_address = address;
_consumerName = consumerName;
_destinations = destinations;
_cancellationToken = cancellationToken;
}
public async Task StartReceivingMessages()
{
await Task.Yield();
while ( !_cancellationToken.IsCancellationRequested )
{
var connectionFactory = new ConnectionFactory();
var address = new Address( _address );
try
{
var connection = await connectionFactory.CreateAsync( address );
var session = ( (IConnection) connection ).CreateSession();
var receivers = new List<IReceiverLink>();
foreach ( var destination in _destinations )
{
var receiver = session.CreateReceiver( $"{_consumerName}_{destination}",
new Source
{
Address = destination,
Capabilities = new[] { new Symbol( "queue" ) }
} );
receivers.Add( receiver );
}
while ( !_cancellationToken.IsCancellationRequested )
foreach ( var receiver in receivers )
{
// ReceiveAsync( TimeSpan.Zero ); blocks forever and no messages will be received
var message = await receiver.ReceiveAsync( TimeSpan.FromMilliseconds( 1 ) );
if ( message == null )
continue;
receiver.Accept( message );
Console.WriteLine( $"{_consumerName} - Received message with id: '{message.Properties.MessageId}'" );
}
}
catch ( Exception ex )
{
Console.WriteLine( $"{_consumerName} - Connection error in producer '{_consumerName}' {ex.Message} => create new connection." );
await Task.Delay( 1000, CancellationToken.None );
}
}
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)