You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/09/13 12:53:33 UTC

svn commit: r575253 - in /incubator/qpid/branches/M2/java: broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java

Author: ritchiem
Date: Thu Sep 13 03:53:29 2007
New Revision: 575253

URL: http://svn.apache.org/viewvc?rev=575253&view=rev
Log:
QPID-572 Applied test patch supplied by Aidan Skinner along with change to ConcurrentSelectorDeliveryManager that resolves the ordering problem.

Added:
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java   (with props)
Modified:
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=575253&r1=575252&r2=575253&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Thu Sep 13 03:53:29 2007
@@ -747,7 +747,7 @@
         {
             Subscription s = _subscriptions.nextSubscriber(msg);
 
-            if (s == null) //no-one can take the message right now.
+            if (s == null || hasQueuedMessages()) //no-one can take the message right now or we're queueing
             {
                 if (debugEnabled)
                 {
@@ -795,6 +795,12 @@
             }
             else
             {
+
+                if (_messages.size() > 0)
+                {
+                    _log.error("Direct delivery with queued msgs:" + _messages.size());
+                }
+
                 //release lock now
                 _lock.unlock();
                 synchronized (s.getSendLock())

Added: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java?rev=575253&view=auto
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java (added)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java Thu Sep 13 03:53:29 2007
@@ -0,0 +1,152 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class tests all the alerts an AMQQueue can throw based on threshold
+ * values of different parameters
+ */
+public class AMQQueueDeferredOrderingTest extends TestCase
+{
+
+    private static final int NUM_MESSAGES = 1000;
+
+    private AMQConnection con;
+    private Session session;
+    private AMQQueue queue;
+    private MessageConsumer consumer;
+
+    private static final Logger _logger = LoggerFactory.getLogger(AMQQueueDeferredOrderingTest.class);
+
+    private ASyncProducer producerThread;
+    private static final String BROKER = "vm://:1";
+
+    private class ASyncProducer extends Thread
+    {
+
+        private MessageProducer producer;
+        private final Logger _logger = LoggerFactory.getLogger(ASyncProducer.class);
+        private Session session;
+        private int start;
+        private int end;
+
+        public ASyncProducer(AMQQueue q, int start, int end) throws Exception
+        {
+            this.session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            this._logger.info("Create Consumer of Q1");
+            this.producer = this.session.createProducer(q);
+            this.start = start;
+            this.end = end;
+        }
+
+        public void run()
+        {
+            try
+            {
+                this._logger.info("Starting to send messages");
+                for (int i = start; i < end && !interrupted(); i++)
+                {
+                    producer.send(session.createTextMessage(Integer.toString(i)));
+                }
+                this._logger.info("Sent " + (end - start) + " messages");
+            }
+            catch (JMSException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        TransportConnection.createVMBroker(1);
+
+        _logger.info("Create Connection");
+        con = new AMQConnection(BROKER, "guest", "guest", "OrderingTest", "test");
+        _logger.info("Create Session");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        _logger.info("Create Q");
+        queue = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q"), new AMQShortString("Q"),
+                false, true);
+        _logger.info("Create Consumer of Q");
+        consumer = session.createConsumer(queue);
+        _logger.info("Start Connection");
+        con.start();
+    }
+
+    public void testPausedOrder() throws Exception
+    {
+
+        // Setup initial messages
+        _logger.info("Creating first producer thread");
+        producerThread = new ASyncProducer(queue, 0, NUM_MESSAGES / 2);
+        producerThread.start();
+        // Wait for them to be done
+        producerThread.join();
+
+        // Setup second set of messages to produce while we consume
+        _logger.info("Creating second producer thread");
+        producerThread = new ASyncProducer(queue, NUM_MESSAGES / 2, NUM_MESSAGES);
+        producerThread.start();
+
+        // Start consuming and checking they're in order
+        _logger.info("Consuming messages");
+        for (int i = 0; i < NUM_MESSAGES; i++)
+        {
+            Message msg = consumer.receive(1500);
+            assertNotNull("Message should not be null", msg);
+            assertTrue("Message should be a text message", msg instanceof TextMessage);
+            assertEquals("Message content does not match expected", Integer.toString(i), ((TextMessage) msg).getText());
+        }
+    }
+
+    protected void tearDown() throws Exception
+    {
+        _logger.info("Interuptting producer thread");
+        producerThread.interrupt();
+        _logger.info("Closing connection");
+        con.close();
+
+        TransportConnection.killAllVMBrokers();
+        super.tearDown();
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new junit.framework.TestSuite(AMQQueueDeferredOrderingTest.class);
+    }
+
+}

Propchange: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date