You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/28 16:52:35 UTC
svn commit: r1771766 - in /qpid/java/trunk:
systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
test-profiles/Java10UninvestigatedTestsExcludes
Author: rgodfrey
Date: Mon Nov 28 16:52:35 2016
New Revision: 1771766
URL: http://svn.apache.org/viewvc?rev=1771766&view=rev
Log:
QPID-7546 : LastValueQueueTest
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java?rev=1771766&r1=1771765&r2=1771766&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java Mon Nov 28 16:52:35 2016
@@ -41,7 +41,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
-import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
@@ -80,7 +79,7 @@ public class LastValueQueueTest extends
_consumerConnection = getConnection();
_consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- createConflationQueue(_producerSession);
+ createConflationQueue(_producerSession, false);
_producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
@@ -117,7 +116,7 @@ public class LastValueQueueTest extends
_consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- createConflationQueue(_producerSession);
+ createConflationQueue(_producerSession, false);
_producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT/2; msg++)
@@ -127,7 +126,7 @@ public class LastValueQueueTest extends
}
// HACK to do something synchronous
- ((AMQSession<?,?>)_producerSession).sync();
+ syncProducerSession();
_consumer = _consumerSession.createConsumer(_queue);
_consumerConnection.start();
@@ -161,7 +160,7 @@ public class LastValueQueueTest extends
// HACK to do something synchronous
- ((AMQSession<?,?>)_producerSession).sync();
+ syncProducerSession();
_consumer = _consumerSession.createConsumer(_queue);
_consumerConnection.start();
@@ -189,7 +188,7 @@ public class LastValueQueueTest extends
_consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- createConflationQueue(_producerSession);
+ createConflationQueue(_producerSession, false);
_producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT/2; msg++)
@@ -198,7 +197,7 @@ public class LastValueQueueTest extends
}
// HACK to do something synchronous
- ((AMQSession<?,?>)_producerSession).sync();
+ syncProducerSession();
_consumer = _consumerSession.createConsumer(_queue);
_consumerConnection.start();
@@ -225,8 +224,7 @@ public class LastValueQueueTest extends
}
// HACK to do something synchronous
- ((AMQSession<?,?>)_producerSession).sync();
-
+ syncProducerSession();
// this causes the "old" messages to be released
_consumerSession.close();
@@ -262,15 +260,15 @@ public class LastValueQueueTest extends
_consumerConnection = getConnection();
_consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- createConflationQueue(_producerSession);
+ createConflationQueue(_producerSession, false);
_producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
_producer.send(nextMessage(msg, _producerSession));
}
-
- final long queueDepth = ((AMQSession<?, ?>)_producerSession).getQueueDepth((AMQDestination)_queue, true);
+ _producerConnection.start();
+ final long queueDepth = getQueueDepth(_producerConnection,_queue);
assertEquals(10, queueDepth);
}
@@ -281,7 +279,7 @@ public class LastValueQueueTest extends
_consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- createConflationQueue(_producerSession);
+ createConflationQueue(_producerSession, true);
_producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
@@ -290,10 +288,19 @@ public class LastValueQueueTest extends
}
- ((AMQSession<?,?>)_producerSession).sync();
+ syncProducerSession();
- AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'");
- AMQQueue browseQueue = new AMQQueue(url);
+ Queue browseQueue;
+ if(isBroker10())
+ {
+ AMQBindingURL url =
+ new AMQBindingURL("direct://amq.direct//" + _queueName + "?browse='true'&durable='true'");
+ browseQueue = new AMQQueue(url);
+ }
+ else
+ {
+ browseQueue = _consumerSession.createQueue(_queueName);
+ }
_consumer = _consumerSession.createConsumer(browseQueue);
_consumerConnection.start();
@@ -316,7 +323,7 @@ public class LastValueQueueTest extends
_producer.send(nextMessage(MSG_COUNT, _producerSession));
- ((AMQSession<?,?>)_producerSession).sync();
+ syncProducerSession();
while((received = _consumer.receive(1000))!=null)
{
@@ -331,13 +338,21 @@ public class LastValueQueueTest extends
_producerConnection.close();
}
+ private void syncProducerSession() throws QpidException
+ {
+ if(!isBroker10())
+ {
+ ((AMQSession<?, ?>) _producerSession).sync();
+ }
+ }
+
public void testConflation2Browsers() throws Exception
{
_consumerConnection = getConnection();
_consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- createConflationQueue(_producerSession);
+ createConflationQueue(_producerSession, true);
_producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
@@ -345,10 +360,19 @@ public class LastValueQueueTest extends
_producer.send(nextMessage(msg, _producerSession));
}
- ((AMQSession<?,?>)_producerSession).sync();
+ syncProducerSession();
- AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'");
- AMQQueue browseQueue = new AMQQueue(url);
+ Queue browseQueue;
+ if(isBroker10())
+ {
+ AMQBindingURL url =
+ new AMQBindingURL("direct://amq.direct//" + _queueName + "?browse='true'&durable='true'");
+ browseQueue = new AMQQueue(url);
+ }
+ else
+ {
+ browseQueue = _consumerSession.createQueue(_queueName);
+ }
_consumer = _consumerSession.createConsumer(browseQueue);
MessageConsumer consumer2 = _consumerSession.createConsumer(browseQueue);
@@ -394,7 +418,7 @@ public class LastValueQueueTest extends
public void testParallelProductionAndConsumption() throws Exception
{
- createConflationQueue(_producerSession);
+ createConflationQueue(_producerSession, false);
// Start producing threads that send messages
BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer("Message sender1");
@@ -422,9 +446,9 @@ public class LastValueQueueTest extends
{
producer.waitUntilQuarterOfMessagesSentToEncourageConflation();
- _consumerConnection = getConnection();
- int smallPrefetchToEncourageConflation = 1;
- _consumerSession = ((AMQConnection)_consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE, smallPrefetchToEncourageConflation);
+ _consumerConnection = getConnectionWithPrefetch(1);
+
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
LOGGER.info("Starting to receive");
@@ -557,13 +581,27 @@ public class LastValueQueueTest extends
}
}
- private void createConflationQueue(Session session) throws QpidException
+ private void createConflationQueue(Session session, final boolean enforceBrowseOnly) throws QpidException, JMSException
{
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key",KEY_PROPERTY);
- ((AMQSession<?,?>) session).createQueue(_queueName, false, true, false, arguments);
- _queue = new AMQQueue("amq.direct", _queueName);
- ((AMQSession<?,?>) session).declareAndBind((AMQDestination)_queue);
+ if(isBroker10())
+ {
+ final Map<String, Object> arguments = new HashMap<String, Object>();
+ arguments.put(LastValueQueue.LVQ_KEY, KEY_PROPERTY);
+ if(enforceBrowseOnly)
+ {
+ arguments.put("ensureNondestructiveConsumers", true);
+ }
+ createEntityUsingAmqpManagement(_queueName, session, "org.apache.qpid.LastValueQueue", arguments);
+ _queue = session.createQueue(_queueName);
+ }
+ else
+ {
+ final Map<String, Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.last_value_queue_key", KEY_PROPERTY);
+ ((AMQSession<?, ?>) session).createQueue(_queueName, false, true, false, arguments);
+ _queue = new AMQQueue("amq.direct", _queueName);
+ ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) _queue);
+ }
}
private Message nextMessage(int msg, Session producerSession) throws JMSException
Modified: qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes?rev=1771766&r1=1771765&r2=1771766&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes (original)
+++ qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes Mon Nov 28 16:52:35 2016
@@ -42,7 +42,6 @@ org.apache.qpid.server.queue.ProducerFlo
org.apache.qpid.server.queue.MultipleTransactedBatchProducerTest#*
org.apache.qpid.server.queue.ModelTest#*
org.apache.qpid.server.queue.LiveQueueOperationsTest#*
-org.apache.qpid.server.queue.LastValueQueueTest#*
org.apache.qpid.server.queue.EnsureNondestructiveConsumersTest#*
org.apache.qpid.server.queue.DefaultFiltersTest#*
org.apache.qpid.server.queue.ConsumerPriorityTest#*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org