You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gr...@apache.org on 2011/02/17 15:34:11 UTC

svn commit: r1071620 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java

Author: grkvlt
Date: Thu Feb 17 14:34:10 2011
New Revision: 1071620

URL: http://svn.apache.org/viewvc?rev=1071620&view=rev
Log:
QPID-3047: Fix QueueDepthWithSelectorTest on 0-10

Refactor test and fix 0-10 client session to flush acks

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1071620&r1=1071619&r2=1071620&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Feb 17 14:34:10 2011
@@ -942,6 +942,7 @@ public class AMQSession_0_10 extends AMQ
 
     protected Long requestQueueDepth(AMQDestination amqd)
     {
+        flushAcknowledgments();
         return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
     }
 

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java?rev=1071620&r1=1071619&r2=1071620&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java Thu Feb 17 14:34:10 2011
@@ -21,30 +21,18 @@
 
 package org.apache.qpid.server.queue;
 
-import junit.framework.TestCase;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
-
 import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.NamingException;
-import javax.naming.spi.InitialContextFactory;
-import java.util.Hashtable;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 /**
  * Test Case to ensure that messages are correctly returned.
@@ -52,19 +40,12 @@ import java.util.Hashtable;
  * - The message is returned.
  * - The broker doesn't leak memory.
  * - The broker's state is correct after test.
- *
- * Why is this hardcoded to InVM testing, should be converted to QTC.
  */
-public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase
+public class QueueDepthWithSelectorTest extends QpidBrokerTestCase
 {
-    protected static final Logger _logger = Logger.getLogger(QueueDepthWithSelectorTest.class);
-
-    protected final String BROKER = "vm://:"+ApplicationRegistry.DEFAULT_INSTANCE;
     protected final String VHOST = "test";
     protected final String QUEUE = this.getClass().getName();
 
-    protected Context _context;
-
     protected Connection _clientConnection;
     protected Connection _producerConnection;
     private Session _clientSession;
@@ -82,47 +63,21 @@ public class QueueDepthWithSelectorTest 
     public void setUp() throws Exception
     {
         super.setUp();
-        TransportConnection.createVMBroker(ApplicationRegistry.DEFAULT_INSTANCE);
-
-        System.err.println("amqj.logging.level:" + System.getProperty("amqj.logging.level"));
-        System.err.println("_logger.level:" + _logger.getLevel());
-        System.err.println("_logger.isE-Error:" + _logger.isEnabledFor(Level.ERROR));
-        System.err.println("_logger.isE-Warn:" + _logger.isEnabledFor(Level.WARN));
-        System.err.println("_logger.isInfo:" + _logger.isInfoEnabled() + ":" + _logger.isEnabledFor(Level.INFO));
-        System.err.println("_logger.isDebug:" + _logger.isDebugEnabled() + ":" + _logger.isEnabledFor(Level.DEBUG));
-        System.err.println("_logger.isTrace:" + _logger.isTraceEnabled() + ":" + _logger.isEnabledFor(Level.TRACE));
-
-        System.err.println(Logger.getRootLogger().getLoggerRepository());
-
-        InitialContextFactory factory = new PropertiesFileInitialContextFactory();
-
-        Hashtable<String, String> env = new Hashtable<String, String>();
-
-        env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID/" + VHOST + "?brokerlist='" + BROKER + "'");
-        env.put("queue.queue", QUEUE);
-
-        _context = factory.getInitialContext(env);
 
         _messages = new Message[MSG_COUNT];
-        _queue = (Queue) _context.lookup("queue");
-        init();
-    }
-
-    @Override
-    public void tearDown() throws Exception
-    {
-        if (_producerConnection != null)
-        {
-            _producerConnection.close();
-        }
-
-        if (_clientConnection != null)
-        {
-            _clientConnection.close();
-        }
+        _queue = getTestQueue();
+        
+        //Create Producer
+        _producerConnection = getConnection();
+        _producerConnection.start();
+        _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        _producer = _producerSession.createProducer(_queue);
 
-        TransportConnection.killVMBroker(ApplicationRegistry.DEFAULT_INSTANCE);
-        super.tearDown();        
+        // Create consumer
+        _clientConnection = getConnection();
+        _clientConnection.start();
+        _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        _consumer = _clientSession.createConsumer(_queue, "key = 23");
     }
 
     public void test() throws Exception
@@ -139,7 +94,8 @@ public class QueueDepthWithSelectorTest 
 
         //Verify we get all the messages.
         _logger.info("Verifying messages");
-        verifyAllMessagesRecevied(0);
+        verifyAllMessagesRecevied(50);
+        verifyBrokerState(0);
 
         //Close the connection.. .giving the broker time to clean up its state.
         _clientConnection.close();
@@ -149,39 +105,18 @@ public class QueueDepthWithSelectorTest 
         verifyBrokerState(0);
     }
 
-    protected void init() throws NamingException, JMSException, AMQException
-    {
-        //Create Producer
-        _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
-        _producerConnection.start();
-        _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        _producer = _producerSession.createProducer(_queue);
-
-        // Create consumer
-        _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
-        _clientConnection.start();
-        _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        _consumer = _clientSession.createConsumer(_queue, "key = 23");
-    }
-
     protected void verifyBrokerState(int expectedDepth)
     {
         try
         {
-            _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
-
-            _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        }
-        catch (Exception e)
-        {
-            fail(e.getMessage());
-        }
+            Connection connection = getConnection();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-        try
-        {
             Thread.sleep(2000);
-            long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue);
+            long queueDepth = ((AMQSession) session).getQueueDepth((AMQDestination) _queue);
             assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth);
+            
+            connection.close();
         }
         catch (InterruptedException e)
         {
@@ -191,34 +126,22 @@ public class QueueDepthWithSelectorTest 
         {
             fail(e.getMessage());
         }
-        finally
+        catch (Exception e)
         {
-            try
-            {
-                _clientConnection.close();
-            }
-            catch (JMSException e)
-            {
-                fail(e.getMessage());
-            }
+            fail(e.getMessage());
         }
-
     }
 
     protected void verifyAllMessagesRecevied(int expectedDepth) throws Exception
     {
-
         boolean[] msgIdRecevied = new boolean[MSG_COUNT];
 
-        for (int i = 0; i < MSG_COUNT; i++)
+        for (int i = 0; i < expectedDepth; i++)
         {
             _messages[i] = _consumer.receive(1000);
             assertNotNull("should have received a message but didn't", _messages[i]);
         }
-
-        long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue);
-        assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth);
-
+        
         //Check received messages
         int msgId = 0;
         for (Message msg : _messages)
@@ -231,7 +154,7 @@ public class QueueDepthWithSelectorTest 
         }
 
         //Check all received
-        for (msgId = 0; msgId < MSG_COUNT; msgId++)
+        for (msgId = 0; msgId < expectedDepth; msgId++)
         {
             assertTrue("Message " + msgId + " not received.", msgIdRecevied[msgId]);
         }
@@ -241,9 +164,6 @@ public class QueueDepthWithSelectorTest 
      * Get the next message putting the given count into the intProperties as ID.
      *
      * @param msgNo the message count to store as ID.
-     *
-     * @return
-     *
      * @throws JMSException
      */
     protected Message nextMessage(int msgNo) throws JMSException
@@ -253,5 +173,4 @@ public class QueueDepthWithSelectorTest 
         send.setIntProperty("key", 23);
         return send;
     }
-
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org