You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/12/14 08:38:22 UTC

svn commit: r1774130 - in /qpid/java/trunk: systests/src/test/java/org/apache/qpid/server/queue/ systests/src/test/java/org/apache/qpid/test/client/queue/ test-profiles/

Author: kwall
Date: Wed Dec 14 08:38:21 2016
New Revision: 1774130

URL: http://svn.apache.org/viewvc?rev=1774130&view=rev
Log:
QPID-7546: Refactor LastValueQueueTest to avoid use of non-public APIs (use ADDRs for both the 0-9 and 0-10 paths)

Eliminated LVQTest which essentially duplicated LastValueQueueTest

Removed:
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/queue/LVQTest.java
Modified:
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
    qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes
    qpid/java/trunk/test-profiles/JavaPre010Excludes

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java?rev=1774130&r1=1774129&r2=1774130&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java Wed Dec 14 08:38:21 2016
@@ -41,11 +41,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.QpidException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.url.AMQBindingURL;
 
 public class LastValueQueueTest extends QpidBrokerTestCase
 {
@@ -65,13 +61,14 @@ public class LastValueQueueTest extends
     private Session _consumerSession;
     private MessageConsumer _consumer;
 
-    protected void setUp() throws Exception
+    @Override
+    public void setUp() throws Exception
     {
         super.setUp();
 
         _queueName = getTestQueueName();
         _producerConnection = getConnection();
-        _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        _producerSession = _producerConnection.createSession(true, Session.SESSION_TRANSACTED);
     }
 
     public void testConflation() throws Exception
@@ -85,6 +82,7 @@ public class LastValueQueueTest extends
         for (int msg = 0; msg < MSG_COUNT; msg++)
         {
             _producer.send(nextMessage(msg, _producerSession));
+            _producerSession.commit();
         }
 
         _producer.close();
@@ -95,8 +93,8 @@ public class LastValueQueueTest extends
         _consumerConnection.start();
         Message received;
 
-        List<Message> messages = new ArrayList<Message>();
-        while((received = _consumer.receive(1000))!=null)
+        List<Message> messages = new ArrayList<>();
+        while((received = _consumer.receive(getReceiveTimeout())) != null)
         {
             messages.add(received);
         }
@@ -122,17 +120,14 @@ public class LastValueQueueTest extends
         for (int msg = 0; msg < MSG_COUNT/2; msg++)
         {
             _producer.send(nextMessage(msg, _producerSession));
-
+            _producerSession.commit();
         }
 
-        // HACK to do something synchronous
-        syncProducerSession();
-
         _consumer = _consumerSession.createConsumer(_queue);
         _consumerConnection.start();
         Message received;
-        List<Message> messages = new ArrayList<Message>();
-        while((received = _consumer.receive(1000))!=null)
+        List<Message> messages = new ArrayList<>();
+        while((received = _consumer.receive(getReceiveTimeout())) != null)
         {
             messages.add(received);
         }
@@ -156,17 +151,14 @@ public class LastValueQueueTest extends
         for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
         {
             _producer.send(nextMessage(msg, _producerSession));
+            _producerSession.commit();
         }
 
-
-        // HACK to do something synchronous
-        syncProducerSession();
-
         _consumer = _consumerSession.createConsumer(_queue);
         _consumerConnection.start();
 
-        messages = new ArrayList<Message>();
-        while((received = _consumer.receive(1000))!=null)
+        messages = new ArrayList<>();
+        while((received = _consumer.receive(getReceiveTimeout())) != null)
         {
             messages.add(received);
         }
@@ -194,16 +186,14 @@ public class LastValueQueueTest extends
         for (int msg = 0; msg < MSG_COUNT/2; msg++)
         {
             _producer.send(nextMessage(msg, _producerSession));
+            _producerSession.commit();
         }
 
-        // HACK to do something synchronous
-        syncProducerSession();
-
         _consumer = _consumerSession.createConsumer(_queue);
         _consumerConnection.start();
         Message received;
-        List<Message> messages = new ArrayList<Message>();
-        while((received = _consumer.receive(1000))!=null)
+        List<Message> messages = new ArrayList<>();
+        while((received = _consumer.receive(getReceiveTimeout())) != null)
         {
             messages.add(received);
         }
@@ -222,9 +212,7 @@ public class LastValueQueueTest extends
         {
             _producer.send(nextMessage(msg, _producerSession));
         }
-
-        // HACK to do something synchronous
-        syncProducerSession();
+        _producerSession.commit();
 
         // this causes the "old" messages to be released
         _consumerSession.close();
@@ -239,8 +227,8 @@ public class LastValueQueueTest extends
         _consumer = _consumerSession.createConsumer(_queue);
         _consumerConnection.start();
 
-        messages = new ArrayList<Message>();
-        while((received = _consumer.receive(1000))!=null)
+        messages = new ArrayList<>();
+        while((received = _consumer.receive(getReceiveTimeout())) != null)
         {
             messages.add(received);
         }
@@ -266,6 +254,7 @@ public class LastValueQueueTest extends
         for (int msg = 0; msg < MSG_COUNT; msg++)
         {
             _producer.send(nextMessage(msg, _producerSession));
+            _producerSession.commit();
         }
         _producerConnection.start();
         final long queueDepth = getQueueDepth(_producerConnection,_queue);
@@ -285,28 +274,14 @@ public class LastValueQueueTest extends
         for (int msg = 0; msg < MSG_COUNT; msg++)
         {
             _producer.send(nextMessage(msg, _producerSession));
-
+            _producerSession.commit();
         }
 
-        syncProducerSession();
-
-        Queue browseQueue;
-        if(isBroker10())
-        {
-            AMQBindingURL url =
-                    new AMQBindingURL("direct://amq.direct//" + _queueName + "?browse='true'&durable='true'");
-            browseQueue = new AMQQueue(url);
-        }
-        else
-        {
-            browseQueue = _consumerSession.createQueue(_queueName);
-        }
-
-        _consumer = _consumerSession.createConsumer(browseQueue);
+        _consumer = _consumerSession.createConsumer(_queue);
         _consumerConnection.start();
         Message received;
-        List<Message> messages = new ArrayList<Message>();
-        while((received = _consumer.receive(1000))!=null)
+        List<Message> messages = new ArrayList<>();
+        while((received = _consumer.receive(getReceiveTimeout())) != null)
         {
             messages.add(received);
         }
@@ -322,10 +297,9 @@ public class LastValueQueueTest extends
         messages.clear();
 
         _producer.send(nextMessage(MSG_COUNT, _producerSession));
+        _producerSession.commit();
 
-        syncProducerSession();
-
-        while((received = _consumer.receive(1000))!=null)
+        while((received = _consumer.receive(getReceiveTimeout())) != null)
         {
             messages.add(received);
         }
@@ -338,49 +312,28 @@ public class LastValueQueueTest extends
         _producerConnection.close();
     }
 
-    private void syncProducerSession() throws QpidException
-    {
-        if(!isBroker10())
-        {
-            ((AMQSession<?, ?>) _producerSession).sync();
-        }
-    }
-
     public void testConflation2Browsers() throws Exception
     {
         _consumerConnection = getConnection();
         _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-
         createConflationQueue(_producerSession, true);
         _producer = _producerSession.createProducer(_queue);
 
         for (int msg = 0; msg < MSG_COUNT; msg++)
         {
             _producer.send(nextMessage(msg, _producerSession));
+            _producerSession.commit();
         }
 
-        syncProducerSession();
-
-        Queue browseQueue;
-        if(isBroker10())
-        {
-            browseQueue = _consumerSession.createQueue(_queueName);
-        }
-        else
-        {
-            AMQBindingURL url =
-                    new AMQBindingURL("direct://amq.direct//" + _queueName + "?browse='true'&durable='true'");
-            browseQueue = new AMQQueue(url);
-        }
+        _consumer = _consumerSession.createConsumer(_queue);
+        MessageConsumer consumer2 = _consumerSession.createConsumer(_queue);
 
-        _consumer = _consumerSession.createConsumer(browseQueue);
-        MessageConsumer consumer2 = _consumerSession.createConsumer(browseQueue);
         _consumerConnection.start();
-        List<Message> messages = new ArrayList<Message>();
-        List<Message> messages2 = new ArrayList<Message>();
-        Message received  = _consumer.receive(1000);
-        Message received2  = consumer2.receive(1000);
+        List<Message> messages = new ArrayList<>();
+        List<Message> messages2 = new ArrayList<>();
+        Message received  = _consumer.receive(getReceiveTimeout());
+        Message received2  = consumer2.receive(getReceiveTimeout());
 
         while(received!=null || received2!=null)
         {
@@ -394,8 +347,8 @@ public class LastValueQueueTest extends
             }
 
 
-            received  = _consumer.receive(1000);
-            received2  = consumer2.receive(1000);
+            received  = _consumer.receive(getReceiveTimeout());
+            received2  = consumer2.receive(getReceiveTimeout());
 
         }
 
@@ -455,19 +408,15 @@ public class LastValueQueueTest extends
         _consumer = _consumerSession.createConsumer(_queue);
         _consumerConnection.start();
 
-        Map<String, Integer> messageSequenceNumbersByKey = new HashMap<String, Integer>();
+        Map<String, Integer> messageSequenceNumbersByKey = new HashMap<>();
 
         Message message;
         int numberOfShutdownsReceived = 0;
         int numberOfMessagesReceived = 0;
         while(numberOfShutdownsReceived < 2)
         {
-            message = _consumer.receive(5000);
-            if(message == null)
-            {
-                System.err.println("here's a good place for a breakpoint");
-            }
-            assertNotNull("null recieved after " + numberOfMessagesReceived + " messages and " + numberOfShutdownsReceived + " shutdowns", message);
+            message = _consumer.receive(getReceiveTimeout());
+            assertNotNull("null received after " + numberOfMessagesReceived + " messages and " + numberOfShutdownsReceived + " shutdowns", message);
 
             if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN))
             {
@@ -501,7 +450,7 @@ public class LastValueQueueTest extends
         private volatile Exception _exception;
 
         private Thread _thread;
-        private Map<String, Integer> _messageSequenceNumbersByKey = new HashMap<String, Integer>();
+        private Map<String, Integer> _messageSequenceNumbersByKey = new HashMap<>();
         private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT/4);
 
         public BackgroundMessageProducer(String threadName)
@@ -585,7 +534,7 @@ public class LastValueQueueTest extends
     {
         if(isBroker10())
         {
-            final Map<String, Object> arguments = new HashMap<String, Object>();
+            final Map<String, Object> arguments = new HashMap<>();
             arguments.put(LastValueQueue.LVQ_KEY, KEY_PROPERTY);
             if(enforceBrowseOnly)
             {
@@ -596,11 +545,13 @@ public class LastValueQueueTest extends
         }
         else
         {
-            final Map<String, Object> arguments = new HashMap<String, Object>();
-            arguments.put("qpid.last_value_queue_key", KEY_PROPERTY);
-            ((AMQSession<?, ?>) session).createQueue(_queueName, false, true, false, arguments);
-            _queue = new AMQQueue("amq.direct", _queueName);
-            ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) _queue);
+            String browserOnly = enforceBrowseOnly ? "mode: browse," : "";
+            String addr = String.format("ADDR:%s; {create: always, %s" +
+                                        "node: {x-declare:{arguments : {'qpid.last_value_queue_key':'%s'}}}}",
+                                        _queueName, browserOnly, KEY_PROPERTY);
+
+            _queue = session.createQueue(addr);
+            session.createConsumer(_queue).close();
         }
     }
 
@@ -613,7 +564,8 @@ public class LastValueQueueTest extends
     {
         Message send = producerSession.createTextMessage("Message: " + msg);
 
-        send.setStringProperty(KEY_PROPERTY, String.valueOf(msg % numberOfUniqueKeyValues));
+        final String keyValue = String.valueOf(msg % numberOfUniqueKeyValues);
+        send.setStringProperty(KEY_PROPERTY, keyValue);
         send.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, msg);
 
         return send;

Modified: qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes?rev=1774130&r1=1774129&r2=1774130&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes (original)
+++ qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes Wed Dec 14 08:38:21 2016
@@ -33,7 +33,6 @@ org.apache.qpid.server.security.acl.Exha
 org.apache.qpid.server.queue.ModelTest#*
 org.apache.qpid.test.unit.topic.DurableSubscriptionTest#*
 org.apache.qpid.test.unit.client.MaxDeliveryCountTest#*
-org.apache.qpid.test.client.queue.LVQTest#*
 org.apache.qpid.systest.rest.ConnectionRestTest#*
 
 

Modified: qpid/java/trunk/test-profiles/JavaPre010Excludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/JavaPre010Excludes?rev=1774130&r1=1774129&r2=1774130&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/JavaPre010Excludes (original)
+++ qpid/java/trunk/test-profiles/JavaPre010Excludes Wed Dec 14 08:38:21 2016
@@ -56,9 +56,6 @@ org.apache.qpid.test.unit.client.connect
 // uses AMQP 0-10 related properties
 org.apache.qpid.test.unit.message.JMSPropertiesTest#testQpidExtensionProperties
 
-// LVQ tests use new address syntax and can not be run on 0.9.1 profiles
-org.apache.qpid.test.client.queue.LVQTest#*
-
 // Verification of unique client id is 0-10 specific
 org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerificationForSameUser
 org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerificationForDifferentUsers



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org