You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Alex Rudyy (JIRA)" <ji...@apache.org> on 2017/02/16 17:17:41 UTC

[jira] [Updated] (QPIDJMS-264) Incoming messages are dispatched out of order

     [ https://issues.apache.org/jira/browse/QPIDJMS-264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alex Rudyy updated QPIDJMS-264:
-------------------------------
    Attachment: out-of-order.log.tgz

The combined client and broker logs (out-of-order.log.tgz) are attached to the JIRA.

I used the following code to reproduce the issue
{code}
public void testDeliveryOrder() throws Exception
    {
        for(int i=0;i<100;i++)
        {
            LOGGER.debug("### iteration " + i);
            performTest();
        }
    }

    private void performTest() throws Exception {
        Map<String, String> enableFrameLogging = Collections.singletonMap("amqp.traceFrames", "true");
        Connection consumingConnection = getConnectionWithOptions(enableFrameLogging);
        Connection producingConnection = getConnectionWithOptions(enableFrameLogging);
        try
        {
            Session consumerSession = consumingConnection.createSession(true, Session.SESSION_TRANSACTED);
            MessageConsumer consumer = consumerSession.createConsumer(_queue);


            Session producerSession = producingConnection.createSession(true, Session.SESSION_TRANSACTED);
            MessageProducer producer = producerSession.createProducer(_queue);

            for (int i = 0; i < 4; i++)
            {
                producer.send(producerSession.createTextMessage("msg" + (i + 1)));
            }

            producerSession.commit();

            consumingConnection.start();
            for (int i = 0; i < 4; i++)
            {
                TextMessage tm = (TextMessage) consumer.receive();
                LOGGER.debug("#### Received:" + tm.getText());
                if (!("msg" + (i + 1)).equals(tm.getText()))
                {
                    fail("Out of order delivery");
                }
            }
            consumerSession.commit();
        }
        finally
        {
            producingConnection.close();
            consumingConnection.close();
        }
    }
{code}

I noticed that 2 connections are required for the issue to reproduce.
Please let me know if wireshark frames are actually required. It does not look to me like an issue with a protocol implementation when messages are sent in wrong order from broker. I think that messages sent and received in the right order by client dispatches them in a wrong order.

Just in case, here is my jvm version:
{noformat}
$ java -version
java version "1.8.0_121"
Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)
{noformat}
My environment is
{noformat}
$ cat /etc/redhat-release 
Fedora release 25 (Twenty Five)
$ uname -a
Linux localhost.localdomain 4.9.9-200.fc25.x86_64 #1 SMP Thu Feb 9 17:28:13 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
{noformat}


> Incoming messages are dispatched out of order 
> ----------------------------------------------
>
>                 Key: QPIDJMS-264
>                 URL: https://issues.apache.org/jira/browse/QPIDJMS-264
>             Project: Qpid JMS
>          Issue Type: Bug
>          Components: qpid-jms-client
>    Affects Versions: 0.20.0
>            Reporter: Alex Rudyy
>            Priority: Critical
>         Attachments: out-of-order.log.tgz
>
>
> Both current trunk version and version 0.20.0 of JMS Client can dispatch incoming messages out of order into client application. I did not test earlier versions. The messages are received in the right order but Client dispatches them occasionally out of order.
> I used the following code to reproduce the issue with trunk version of Java Broker.
> {code}
> public static void main(String[] args) throws Exception
>     {
>         for (int i=0;i<100;i++)
>         {
>             execute(i);
>         }
>     }
>     public static void execute(int iteration) throws Exception
>     {
>         System.out.println("Iteration " + iteration);
>         Connection managingConnection = createConnection();
>         Session session = managingConnection.createSession(Session.SESSION_TRANSACTED);
>         Connection consumingConnection = createConnection();
>         Session consumerSession = consumingConnection.createSession(true, Session.SESSION_TRANSACTED);
>         Queue queue = createTestQueue(session, "Q3-test");
>         session.commit();
>         MessageConsumer consumer = consumerSession.createConsumer(queue);
>         Connection producingConnection = createConnection();
>         Session producerSession = producingConnection.createSession(true, Session.SESSION_TRANSACTED);
>         MessageProducer producer = producerSession.createProducer(queue);
>         producer.send(producerSession.createTextMessage("msg1"));
>         producer.send(producerSession.createTextMessage("msg2"));
>         producer.send(producerSession.createTextMessage("msg3"));
>         producer.send(producerSession.createTextMessage("msg4"));
>         producerSession.commit();
>         consumingConnection.start();
>         TextMessage tm = (TextMessage) consumer.receive();
>         System.out.println(">" + tm.getText());
>         if (!"msg1".equals(tm.getText()))
>         {
>             throw new RuntimeException("Unexpected");
>         }
>         deleteQueue(session, "Q3-test");
>         consumingConnection.close();
>         producingConnection.close();
>         managingConnection.close();
>     }
>     private static Queue createTestQueue(Session session, String queueName) throws JMSException
>     {
>         MessageProducer producer = session.createProducer(session.createQueue("$management"));
>         MapMessage createMessage = session.createMapMessage();
>         createMessage.setStringProperty("type", "org.apache.qpid.Queue");
>         createMessage.setStringProperty("operation", "CREATE");
>         createMessage.setString("name", queueName);
>         createMessage.setString("object-path", "org.apache.qpid.Queue");
>         producer.send(createMessage);
>         if (session.getTransacted())
>         {
>             session.commit();
>         }
>         return session.createQueue(queueName);
>     }
>     private  static void deleteQueue(Session session, String queueName)throws JMSException
>     {
>         MessageProducer producer = session.createProducer(session.createQueue("$management"));
>         MapMessage createMessage = session.createMapMessage();
>         createMessage.setStringProperty("type", "org.apache.qpid.Queue");
>         createMessage.setStringProperty("operation", "DELETE");
>         createMessage.setStringProperty("index", "object-path");
>         createMessage.setStringProperty("key", queueName);
>         producer.send(createMessage);
>         if (session.getTransacted())
>         {
>             session.commit();
>         }
>     }
>     private static Connection createConnection() throws JMSException, NamingException {
>         Properties properties = new Properties();
>         properties.put("java.naming.factory.initial", "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
>         properties.put("connectionfactory.myFactoryLookup", "amqp://localhost:5672");
>         Context context = new InitialContext(properties);
>         try
>         {
>             ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
>             return factory.createConnection("admin", "admin");
>         }
>         finally
>         {
>             context.close();
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org