You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Alex Rudyy (JIRA)" <ji...@apache.org> on 2017/02/16 17:17:41 UTC
[jira] [Updated] (QPIDJMS-264) Incoming messages are dispatched out
of order
[ https://issues.apache.org/jira/browse/QPIDJMS-264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alex Rudyy updated QPIDJMS-264:
-------------------------------
Attachment: out-of-order.log.tgz
The combined client and broker logs (out-of-order.log.tgz) are attached to the JIRA.
I used the following code to reproduce the issue
{code}
public void testDeliveryOrder() throws Exception
{
for(int i=0;i<100;i++)
{
LOGGER.debug("### iteration " + i);
performTest();
}
}
private void performTest() throws Exception {
Map<String, String> enableFrameLogging = Collections.singletonMap("amqp.traceFrames", "true");
Connection consumingConnection = getConnectionWithOptions(enableFrameLogging);
Connection producingConnection = getConnectionWithOptions(enableFrameLogging);
try
{
Session consumerSession = consumingConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(_queue);
Session producerSession = producingConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = producerSession.createProducer(_queue);
for (int i = 0; i < 4; i++)
{
producer.send(producerSession.createTextMessage("msg" + (i + 1)));
}
producerSession.commit();
consumingConnection.start();
for (int i = 0; i < 4; i++)
{
TextMessage tm = (TextMessage) consumer.receive();
LOGGER.debug("#### Received:" + tm.getText());
if (!("msg" + (i + 1)).equals(tm.getText()))
{
fail("Out of order delivery");
}
}
consumerSession.commit();
}
finally
{
producingConnection.close();
consumingConnection.close();
}
}
{code}
I noticed that 2 connections are required for the issue to reproduce.
Please let me know if wireshark frames are actually required. It does not look to me like an issue with a protocol implementation when messages are sent in wrong order from broker. I think that messages sent and received in the right order by client dispatches them in a wrong order.
Just in case, here is my jvm version:
{noformat}
$ java -version
java version "1.8.0_121"
Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)
{noformat}
My environment is
{noformat}
$ cat /etc/redhat-release
Fedora release 25 (Twenty Five)
$ uname -a
Linux localhost.localdomain 4.9.9-200.fc25.x86_64 #1 SMP Thu Feb 9 17:28:13 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
{noformat}
> Incoming messages are dispatched out of order
> ----------------------------------------------
>
> Key: QPIDJMS-264
> URL: https://issues.apache.org/jira/browse/QPIDJMS-264
> Project: Qpid JMS
> Issue Type: Bug
> Components: qpid-jms-client
> Affects Versions: 0.20.0
> Reporter: Alex Rudyy
> Priority: Critical
> Attachments: out-of-order.log.tgz
>
>
> Both current trunk version and version 0.20.0 of JMS Client can dispatch incoming messages out of order into client application. I did not test earlier versions. The messages are received in the right order but Client dispatches them occasionally out of order.
> I used the following code to reproduce the issue with trunk version of Java Broker.
> {code}
> public static void main(String[] args) throws Exception
> {
> for (int i=0;i<100;i++)
> {
> execute(i);
> }
> }
> public static void execute(int iteration) throws Exception
> {
> System.out.println("Iteration " + iteration);
> Connection managingConnection = createConnection();
> Session session = managingConnection.createSession(Session.SESSION_TRANSACTED);
> Connection consumingConnection = createConnection();
> Session consumerSession = consumingConnection.createSession(true, Session.SESSION_TRANSACTED);
> Queue queue = createTestQueue(session, "Q3-test");
> session.commit();
> MessageConsumer consumer = consumerSession.createConsumer(queue);
> Connection producingConnection = createConnection();
> Session producerSession = producingConnection.createSession(true, Session.SESSION_TRANSACTED);
> MessageProducer producer = producerSession.createProducer(queue);
> producer.send(producerSession.createTextMessage("msg1"));
> producer.send(producerSession.createTextMessage("msg2"));
> producer.send(producerSession.createTextMessage("msg3"));
> producer.send(producerSession.createTextMessage("msg4"));
> producerSession.commit();
> consumingConnection.start();
> TextMessage tm = (TextMessage) consumer.receive();
> System.out.println(">" + tm.getText());
> if (!"msg1".equals(tm.getText()))
> {
> throw new RuntimeException("Unexpected");
> }
> deleteQueue(session, "Q3-test");
> consumingConnection.close();
> producingConnection.close();
> managingConnection.close();
> }
> private static Queue createTestQueue(Session session, String queueName) throws JMSException
> {
> MessageProducer producer = session.createProducer(session.createQueue("$management"));
> MapMessage createMessage = session.createMapMessage();
> createMessage.setStringProperty("type", "org.apache.qpid.Queue");
> createMessage.setStringProperty("operation", "CREATE");
> createMessage.setString("name", queueName);
> createMessage.setString("object-path", "org.apache.qpid.Queue");
> producer.send(createMessage);
> if (session.getTransacted())
> {
> session.commit();
> }
> return session.createQueue(queueName);
> }
> private static void deleteQueue(Session session, String queueName)throws JMSException
> {
> MessageProducer producer = session.createProducer(session.createQueue("$management"));
> MapMessage createMessage = session.createMapMessage();
> createMessage.setStringProperty("type", "org.apache.qpid.Queue");
> createMessage.setStringProperty("operation", "DELETE");
> createMessage.setStringProperty("index", "object-path");
> createMessage.setStringProperty("key", queueName);
> producer.send(createMessage);
> if (session.getTransacted())
> {
> session.commit();
> }
> }
> private static Connection createConnection() throws JMSException, NamingException {
> Properties properties = new Properties();
> properties.put("java.naming.factory.initial", "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
> properties.put("connectionfactory.myFactoryLookup", "amqp://localhost:5672");
> Context context = new InitialContext(properties);
> try
> {
> ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
> return factory.createConnection("admin", "admin");
> }
> finally
> {
> context.close();
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org