You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/28 16:52:35 UTC

svn commit: r1771766 - in /qpid/java/trunk: systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java test-profiles/Java10UninvestigatedTestsExcludes

Author: rgodfrey
Date: Mon Nov 28 16:52:35 2016
New Revision: 1771766

URL: http://svn.apache.org/viewvc?rev=1771766&view=rev
Log:
QPID-7546 : LastValueQueueTest

Modified:
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
    qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java?rev=1771766&r1=1771765&r2=1771766&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java Mon Nov 28 16:52:35 2016
@@ -41,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.QpidException;
-import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
@@ -80,7 +79,7 @@ public class LastValueQueueTest extends
         _consumerConnection = getConnection();
         _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-        createConflationQueue(_producerSession);
+        createConflationQueue(_producerSession, false);
         _producer = _producerSession.createProducer(_queue);
 
         for (int msg = 0; msg < MSG_COUNT; msg++)
@@ -117,7 +116,7 @@ public class LastValueQueueTest extends
         _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
 
-        createConflationQueue(_producerSession);
+        createConflationQueue(_producerSession, false);
         _producer = _producerSession.createProducer(_queue);
 
         for (int msg = 0; msg < MSG_COUNT/2; msg++)
@@ -127,7 +126,7 @@ public class LastValueQueueTest extends
         }
 
         // HACK to do something synchronous
-        ((AMQSession<?,?>)_producerSession).sync();
+        syncProducerSession();
 
         _consumer = _consumerSession.createConsumer(_queue);
         _consumerConnection.start();
@@ -161,7 +160,7 @@ public class LastValueQueueTest extends
 
 
         // HACK to do something synchronous
-        ((AMQSession<?,?>)_producerSession).sync();
+        syncProducerSession();
 
         _consumer = _consumerSession.createConsumer(_queue);
         _consumerConnection.start();
@@ -189,7 +188,7 @@ public class LastValueQueueTest extends
         _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
 
-        createConflationQueue(_producerSession);
+        createConflationQueue(_producerSession, false);
         _producer = _producerSession.createProducer(_queue);
 
         for (int msg = 0; msg < MSG_COUNT/2; msg++)
@@ -198,7 +197,7 @@ public class LastValueQueueTest extends
         }
 
         // HACK to do something synchronous
-        ((AMQSession<?,?>)_producerSession).sync();
+        syncProducerSession();
 
         _consumer = _consumerSession.createConsumer(_queue);
         _consumerConnection.start();
@@ -225,8 +224,7 @@ public class LastValueQueueTest extends
         }
 
         // HACK to do something synchronous
-        ((AMQSession<?,?>)_producerSession).sync();
-
+        syncProducerSession();
 
         // this causes the "old" messages to be released
         _consumerSession.close();
@@ -262,15 +260,15 @@ public class LastValueQueueTest extends
         _consumerConnection = getConnection();
         _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-        createConflationQueue(_producerSession);
+        createConflationQueue(_producerSession, false);
         _producer = _producerSession.createProducer(_queue);
 
         for (int msg = 0; msg < MSG_COUNT; msg++)
         {
             _producer.send(nextMessage(msg, _producerSession));
         }
-
-        final long queueDepth = ((AMQSession<?, ?>)_producerSession).getQueueDepth((AMQDestination)_queue, true);
+        _producerConnection.start();
+        final long queueDepth = getQueueDepth(_producerConnection,_queue);
 
         assertEquals(10, queueDepth);
     }
@@ -281,7 +279,7 @@ public class LastValueQueueTest extends
         _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
 
-        createConflationQueue(_producerSession);
+        createConflationQueue(_producerSession, true);
         _producer = _producerSession.createProducer(_queue);
 
         for (int msg = 0; msg < MSG_COUNT; msg++)
@@ -290,10 +288,19 @@ public class LastValueQueueTest extends
 
         }
 
-        ((AMQSession<?,?>)_producerSession).sync();
+        syncProducerSession();
 
-        AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'");
-        AMQQueue browseQueue = new AMQQueue(url);
+        Queue browseQueue;
+        if(isBroker10())
+        {
+            AMQBindingURL url =
+                    new AMQBindingURL("direct://amq.direct//" + _queueName + "?browse='true'&durable='true'");
+            browseQueue = new AMQQueue(url);
+        }
+        else
+        {
+            browseQueue = _consumerSession.createQueue(_queueName);
+        }
 
         _consumer = _consumerSession.createConsumer(browseQueue);
         _consumerConnection.start();
@@ -316,7 +323,7 @@ public class LastValueQueueTest extends
 
         _producer.send(nextMessage(MSG_COUNT, _producerSession));
 
-        ((AMQSession<?,?>)_producerSession).sync();
+        syncProducerSession();
 
         while((received = _consumer.receive(1000))!=null)
         {
@@ -331,13 +338,21 @@ public class LastValueQueueTest extends
         _producerConnection.close();
     }
 
+    private void syncProducerSession() throws QpidException
+    {
+        if(!isBroker10())
+        {
+            ((AMQSession<?, ?>) _producerSession).sync();
+        }
+    }
+
     public void testConflation2Browsers() throws Exception
     {
         _consumerConnection = getConnection();
         _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
 
-        createConflationQueue(_producerSession);
+        createConflationQueue(_producerSession, true);
         _producer = _producerSession.createProducer(_queue);
 
         for (int msg = 0; msg < MSG_COUNT; msg++)
@@ -345,10 +360,19 @@ public class LastValueQueueTest extends
             _producer.send(nextMessage(msg, _producerSession));
         }
 
-        ((AMQSession<?,?>)_producerSession).sync();
+        syncProducerSession();
 
-        AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'");
-        AMQQueue browseQueue = new AMQQueue(url);
+        Queue browseQueue;
+        if(isBroker10())
+        {
+            AMQBindingURL url =
+                    new AMQBindingURL("direct://amq.direct//" + _queueName + "?browse='true'&durable='true'");
+            browseQueue = new AMQQueue(url);
+        }
+        else
+        {
+            browseQueue = _consumerSession.createQueue(_queueName);
+        }
 
         _consumer = _consumerSession.createConsumer(browseQueue);
         MessageConsumer consumer2 = _consumerSession.createConsumer(browseQueue);
@@ -394,7 +418,7 @@ public class LastValueQueueTest extends
 
     public void testParallelProductionAndConsumption() throws Exception
     {
-        createConflationQueue(_producerSession);
+        createConflationQueue(_producerSession, false);
 
         // Start producing threads that send messages
         BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer("Message sender1");
@@ -422,9 +446,9 @@ public class LastValueQueueTest extends
     {
         producer.waitUntilQuarterOfMessagesSentToEncourageConflation();
 
-        _consumerConnection = getConnection();
-        int smallPrefetchToEncourageConflation = 1;
-        _consumerSession = ((AMQConnection)_consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE, smallPrefetchToEncourageConflation);
+        _consumerConnection = getConnectionWithPrefetch(1);
+
+        _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         LOGGER.info("Starting to receive");
 
@@ -557,13 +581,27 @@ public class LastValueQueueTest extends
         }
     }
 
-    private void createConflationQueue(Session session) throws QpidException
+    private void createConflationQueue(Session session, final boolean enforceBrowseOnly) throws QpidException, JMSException
     {
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("qpid.last_value_queue_key",KEY_PROPERTY);
-        ((AMQSession<?,?>) session).createQueue(_queueName, false, true, false, arguments);
-        _queue = new AMQQueue("amq.direct", _queueName);
-        ((AMQSession<?,?>) session).declareAndBind((AMQDestination)_queue);
+        if(isBroker10())
+        {
+            final Map<String, Object> arguments = new HashMap<String, Object>();
+            arguments.put(LastValueQueue.LVQ_KEY, KEY_PROPERTY);
+            if(enforceBrowseOnly)
+            {
+                arguments.put("ensureNondestructiveConsumers", true);
+            }
+            createEntityUsingAmqpManagement(_queueName, session, "org.apache.qpid.LastValueQueue", arguments);
+            _queue = session.createQueue(_queueName);
+        }
+        else
+        {
+            final Map<String, Object> arguments = new HashMap<String, Object>();
+            arguments.put("qpid.last_value_queue_key", KEY_PROPERTY);
+            ((AMQSession<?, ?>) session).createQueue(_queueName, false, true, false, arguments);
+            _queue = new AMQQueue("amq.direct", _queueName);
+            ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) _queue);
+        }
     }
 
     private Message nextMessage(int msg, Session producerSession) throws JMSException

Modified: qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes?rev=1771766&r1=1771765&r2=1771766&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes (original)
+++ qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes Mon Nov 28 16:52:35 2016
@@ -42,7 +42,6 @@ org.apache.qpid.server.queue.ProducerFlo
 org.apache.qpid.server.queue.MultipleTransactedBatchProducerTest#*
 org.apache.qpid.server.queue.ModelTest#*
 org.apache.qpid.server.queue.LiveQueueOperationsTest#*
-org.apache.qpid.server.queue.LastValueQueueTest#*
 org.apache.qpid.server.queue.EnsureNondestructiveConsumersTest#*
 org.apache.qpid.server.queue.DefaultFiltersTest#*
 org.apache.qpid.server.queue.ConsumerPriorityTest#*



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org