You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/20 10:11:10 UTC

svn commit: r530683 - in /incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client: MessageListenerMultiConsumerImmediatePrefetch.java MessageListenerMultiConsumerTest.java

Author: ritchiem
Date: Fri Apr 20 01:11:05 2007
New Revision: 530683

URL: http://svn.apache.org/viewvc?view=rev&rev=530683
Log:
Reinstated the two consumer receive test. 
Added additional test class to cover the IMMEDIATE_PREFETCHs. 

Added:
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java   (with props)
Modified:
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java

Added: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java?view=auto&rev=530683
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java (added)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java Fri Apr 20 01:11:05 2007
@@ -0,0 +1,70 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class MessageListenerMultiConsumerImmediatePrefetch extends MessageListenerMultiConsumerTest
+{
+
+
+    protected void setUp() throws Exception
+    {
+
+        System.setProperty(AMQSession.IMMEDIATE_PREFETCH, "true");
+        super.setUp();
+
+    }
+
+
+    public static junit.framework.Test suite()
+    {
+        return new junit.framework.TestSuite(MessageListenerMultiConsumerImmediatePrefetch.class);
+    }
+}

Propchange: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java?view=diff&rev=530683&r1=530682&r2=530683
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java Fri Apr 20 01:11:05 2007
@@ -62,7 +62,8 @@
     private Connection _clientConnection;
     private MessageConsumer _consumer1;
     private MessageConsumer _consumer2;
-
+    private Session _clientSession1;
+    private Queue _queue;
     private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock
 
 
@@ -76,25 +77,25 @@
         Hashtable<String, String> env = new Hashtable<String, String>();
 
         env.put("connectionfactory.connection", "amqp://guest:guest@MLT_ID/test?brokerlist='vm://:1'");
-        env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+        env.put("queue.queue", "direct://amq.direct//"+this.getClass().getName());
 
         _context = factory.getInitialContext(env);
 
-        Queue queue = (Queue) _context.lookup("queue");
+        _queue = (Queue) _context.lookup("queue");
 
         //Create Client 1
         _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
 
         _clientConnection.start();
 
-        Session clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        _clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-        _consumer1 = clientSession1.createConsumer(queue);
+        _consumer1 = _clientSession1.createConsumer(_queue);
 
         //Create Client 2
         Session clientSession2 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-        _consumer2 = clientSession2.createConsumer(queue);
+        _consumer2 = clientSession2.createConsumer(_queue);
 
         //Create Producer
         Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
@@ -104,7 +105,7 @@
 
         Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-        MessageProducer producer = producerSession.createProducer(queue);
+        MessageProducer producer = producerSession.createProducer(_queue);
 
         for (int msg = 0; msg < MSG_COUNT; msg++)
         {
@@ -123,20 +124,6 @@
         TransportConnection.killAllVMBrokers();
     }
 
-//    public void testRecieveC1thenC2() throws Exception
-//    {
-//
-//        for (int msg = 0; msg < MSG_COUNT / 2; msg++)
-//        {
-//
-//            assertTrue(_consumer1.receive() != null);
-//        }
-//
-//        for (int msg = 0; msg < MSG_COUNT / 2; msg++)
-//        {
-//            assertTrue(_consumer2.receive() != null);
-//        }
-//    }
 
     public void testRecieveInterleaved() throws Exception
     {
@@ -206,14 +193,53 @@
         assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
     }
 
-    public void testRecieveC2Only_OnlyRunWith_REGISTER_CONSUMERS_FLOWED() throws Exception
+    public void testRecieveC2Only() throws Exception
     {
-        if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+        if (!Boolean.parseBoolean(System.getProperties().
+                getProperty(AMQSession.IMMEDIATE_PREFETCH, AMQSession.IMMEDIATE_PREFETCH_DEFAULT)))
         {
+            _logger.info("Performing Receive only on C2");
             for (int msg = 0; msg < MSG_COUNT; msg++)
             {
                 assertTrue(MSG_COUNT + " msg should be received. Only received:" + msg,
                            _consumer2.receive(1000) != null);
+            }
+        }
+    }
+
+    public void testRecieveBoth() throws Exception
+    {
+        if (!Boolean.parseBoolean(System.getProperties().
+                getProperty(AMQSession.IMMEDIATE_PREFETCH, AMQSession.IMMEDIATE_PREFETCH_DEFAULT)))
+        {
+            _logger.info("Performing Receive only with two consumers on one session ");
+
+            MessageConsumer consumer2 = _clientSession1.createConsumer(_queue);
+
+            for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+            {
+
+                assertTrue(_consumer1.receive() != null);
+            }
+
+            for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+            {
+                assertTrue(consumer2.receive() != null);
+            }
+        }
+        else
+        {
+            _logger.info("Performing Receive only on both C1 and C2");
+
+            for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+            {
+
+                assertTrue(_consumer1.receive() != null);
+            }
+
+            for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+            {
+                assertTrue(_consumer2.receive() != null);
             }
         }
     }