You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/04/11 00:12:57 UTC

svn commit: r764081 - in /qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue: QueueDepthWithSelectorTest.java QueueDepthWithSelectorUsingFlowToDiskTest.java

Author: ritchiem
Date: Fri Apr 10 22:12:57 2009
New Revision: 764081

URL: http://svn.apache.org/viewvc?rev=764081&view=rev
Log:
QPID-1803 : Test that selectors work when a QueueDepth is set.

Added:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorUsingFlowToDiskTest.java
Modified:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java?rev=764081&r1=764080&r2=764081&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java Fri Apr 10 22:12:57 2009
@@ -24,7 +24,6 @@
 import junit.framework.TestCase;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-import org.apache.log4j.PropertyConfigurator;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
@@ -53,22 +52,26 @@
  */
 public class QueueDepthWithSelectorTest extends TestCase
 {
-    private static final Logger _logger = Logger.getLogger(QueueDepthWithSelectorTest.class);
+    protected static final Logger _logger = Logger.getLogger(QueueDepthWithSelectorTest.class);
 
     protected final String BROKER = "vm://:1";
     protected final String VHOST = "test";
     protected final String QUEUE = this.getClass().getName();
 
-    private Context _context;
+    protected Context _context;
 
-    private Connection _clientConnection, _producerConnection;
-    private Session _clientSession, _producerSession;
-    private MessageProducer _producer;
+    protected Connection _clientConnection;
+    protected Connection _producerConnection;
+    private Session _clientSession;
+    protected Session _producerSession;
+    protected MessageProducer _producer;
     private MessageConsumer _consumer;
 
-    private static final int MSG_COUNT = 50;
+    protected static int MSG_COUNT = 50;
 
-    private Message[] _messages = new Message[MSG_COUNT];
+    protected Message[] _messages = new Message[MSG_COUNT];
+
+    protected Queue _queue;
 
     protected void setUp() throws Exception
     {
@@ -96,6 +99,9 @@
 
         _context = factory.getInitialContext(env);
 
+        _messages = new Message[MSG_COUNT];
+        _queue = (Queue) _context.lookup("queue");
+        init();
     }
 
     protected void tearDown() throws Exception
@@ -120,8 +126,6 @@
 
     public void test() throws Exception
     {
-
-        init();
         //Send messages
         _logger.info("Starting to send messages");
         for (int msg = 0; msg < MSG_COUNT; msg++)
@@ -134,34 +138,32 @@
 
         //Verify we get all the messages.
         _logger.info("Verifying messages");
-        verifyAllMessagesRecevied();
+        verifyAllMessagesRecevied(0);
 
         //Close the connection.. .giving the broker time to clean up its state.
         _clientConnection.close();
 
         //Verify Broker state
         _logger.info("Verifying broker state");
-        verifyBrokerState();
+        verifyBrokerState(0);
     }
 
-    private void init() throws NamingException, JMSException
+    protected void init() throws NamingException, JMSException, AMQException
     {
-        _messages = new Message[MSG_COUNT];
-
         //Create Producer
         _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
         _producerConnection.start();
         _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        _producer = _producerSession.createProducer((Queue) _context.lookup("queue"));
+        _producer = _producerSession.createProducer(_queue);
 
         // Create consumer
         _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
         _clientConnection.start();
         _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        _consumer = _clientSession.createConsumer((Queue) _context.lookup("queue"), "key = 23");
+        _consumer = _clientSession.createConsumer(_queue, "key = 23");
     }
 
-    private void verifyBrokerState()
+    protected void verifyBrokerState(int expectedDepth)
     {
         try
         {
@@ -177,17 +179,13 @@
         try
         {
             Thread.sleep(2000);
-            long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _context.lookup("queue"));
-            assertEquals("Session reports Queue depth not as expected", 0, queueDepth);
+            long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue);
+            assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth);
         }
         catch (InterruptedException e)
         {
             fail(e.getMessage());
         }
-        catch (NamingException e)
-        {
-            fail(e.getMessage());
-        }
         catch (AMQException e)
         {
             fail(e.getMessage());
@@ -206,7 +204,7 @@
 
     }
 
-    private void verifyAllMessagesRecevied() throws Exception
+    protected void verifyAllMessagesRecevied(int expectedDepth) throws Exception
     {
 
         boolean[] msgIdRecevied = new boolean[MSG_COUNT];
@@ -216,8 +214,9 @@
             _messages[i] = _consumer.receive(1000);
             assertNotNull("should have received a message but didn't", _messages[i]);
         }
-        long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _context.lookup("queue"));
-        assertEquals("Session reports Queue depth not as expected", 0, queueDepth);
+
+        long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue);
+        assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth);
 
         //Check received messages
         int msgId = 0;
@@ -246,8 +245,7 @@
      *
      * @throws JMSException
      */
-
-    private Message nextMessage(int msgNo) throws JMSException
+    protected Message nextMessage(int msgNo) throws JMSException
     {
         Message send = _producerSession.createTextMessage("MessageReturnTest");
         send.setIntProperty("ID", msgNo);

Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorUsingFlowToDiskTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorUsingFlowToDiskTest.java?rev=764081&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorUsingFlowToDiskTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorUsingFlowToDiskTest.java Fri Apr 10 22:12:57 2009
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+
+import javax.jms.Session;
+import javax.jms.Message;
+import javax.jms.ConnectionFactory;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.naming.NamingException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class QueueDepthWithSelectorUsingFlowToDiskTest extends QueueDepthWithSelectorTest
+{
+
+    @Override
+    public void init() throws NamingException, JMSException, AMQException
+    {
+        //Incresae the number of messages to send
+        MSG_COUNT = 100;
+
+        //Resize the array
+        _messages = new Message[MSG_COUNT];
+
+
+        Map<String, Object> arguments = new HashMap<String, Object>();
+
+        //Ensure we can call createQueue with a priority int value
+        arguments.put(AMQQueueFactory.QPID_POLICY_TYPE.toString(), AMQQueueFactory.QPID_FLOW_TO_DISK);
+        // each message in the QueueDepthWithSelectorTest is 17 bytes each so only give space for half
+        arguments.put(AMQQueueFactory.QPID_MAX_SIZE.toString(), 8 * MSG_COUNT);
+
+        //Create the FlowToDisk Queue
+        Connection connection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+        AMQSession session = ((AMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
+        session.createQueue(new AMQShortString(getName()), false, false, false, arguments);
+
+        // Get a JMS reference to the new queue
+        _queue = session.createQueue(getName());
+        connection.close();
+
+        super.init();
+    }
+
+    public void testOnlyGettingHalf() throws Exception
+    {
+        //Send messages
+        _logger.info("Starting to send messages");
+        for (int msg = 0; msg < MSG_COUNT; msg++)
+        {
+            //Send a message that matches the selector
+            _producer.send(nextMessage(msg));
+
+            //Send one that doesn't
+            _producer.send(_producerSession.createTextMessage("MessageReturnTest"));
+        }
+
+        
+        _logger.info("Closing connection");
+        //Close the connection.. .giving the broker time to clean up its state.
+        _producerConnection.close();
+
+        //Verify we get all the messages.
+        _logger.info("Verifying messages");
+        // Expecting there to be MSG_COUNT on the queue as we have sent
+        // MSG_COUNT * (one that matches selector and one that doesn't)
+        verifyAllMessagesRecevied(MSG_COUNT);
+
+        //Close the connection.. .giving the broker time to clean up its state.
+        _clientConnection.close();
+
+        //Verify Broker state
+        _logger.info("Verifying broker state");
+        verifyBrokerState(MSG_COUNT);
+    }
+
+
+
+
+
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org