You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/12/14 08:38:22 UTC
svn commit: r1774130 - in /qpid/java/trunk:
systests/src/test/java/org/apache/qpid/server/queue/
systests/src/test/java/org/apache/qpid/test/client/queue/ test-profiles/
Author: kwall
Date: Wed Dec 14 08:38:21 2016
New Revision: 1774130
URL: http://svn.apache.org/viewvc?rev=1774130&view=rev
Log:
QPID-7546: Refactor LastValueQueueTest to avoid use of non-public APIs (use ADDRs for both the 0-9 and 0-10 paths)
Eliminated LVQTest which essentially duplicated LastValueQueueTest
Removed:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/queue/LVQTest.java
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes
qpid/java/trunk/test-profiles/JavaPre010Excludes
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java?rev=1774130&r1=1774129&r2=1774130&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java Wed Dec 14 08:38:21 2016
@@ -41,11 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.url.AMQBindingURL;
public class LastValueQueueTest extends QpidBrokerTestCase
{
@@ -65,13 +61,14 @@ public class LastValueQueueTest extends
private Session _consumerSession;
private MessageConsumer _consumer;
- protected void setUp() throws Exception
+ @Override
+ public void setUp() throws Exception
{
super.setUp();
_queueName = getTestQueueName();
_producerConnection = getConnection();
- _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _producerSession = _producerConnection.createSession(true, Session.SESSION_TRANSACTED);
}
public void testConflation() throws Exception
@@ -85,6 +82,7 @@ public class LastValueQueueTest extends
for (int msg = 0; msg < MSG_COUNT; msg++)
{
_producer.send(nextMessage(msg, _producerSession));
+ _producerSession.commit();
}
_producer.close();
@@ -95,8 +93,8 @@ public class LastValueQueueTest extends
_consumerConnection.start();
Message received;
- List<Message> messages = new ArrayList<Message>();
- while((received = _consumer.receive(1000))!=null)
+ List<Message> messages = new ArrayList<>();
+ while((received = _consumer.receive(getReceiveTimeout())) != null)
{
messages.add(received);
}
@@ -122,17 +120,14 @@ public class LastValueQueueTest extends
for (int msg = 0; msg < MSG_COUNT/2; msg++)
{
_producer.send(nextMessage(msg, _producerSession));
-
+ _producerSession.commit();
}
- // HACK to do something synchronous
- syncProducerSession();
-
_consumer = _consumerSession.createConsumer(_queue);
_consumerConnection.start();
Message received;
- List<Message> messages = new ArrayList<Message>();
- while((received = _consumer.receive(1000))!=null)
+ List<Message> messages = new ArrayList<>();
+ while((received = _consumer.receive(getReceiveTimeout())) != null)
{
messages.add(received);
}
@@ -156,17 +151,14 @@ public class LastValueQueueTest extends
for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
{
_producer.send(nextMessage(msg, _producerSession));
+ _producerSession.commit();
}
-
- // HACK to do something synchronous
- syncProducerSession();
-
_consumer = _consumerSession.createConsumer(_queue);
_consumerConnection.start();
- messages = new ArrayList<Message>();
- while((received = _consumer.receive(1000))!=null)
+ messages = new ArrayList<>();
+ while((received = _consumer.receive(getReceiveTimeout())) != null)
{
messages.add(received);
}
@@ -194,16 +186,14 @@ public class LastValueQueueTest extends
for (int msg = 0; msg < MSG_COUNT/2; msg++)
{
_producer.send(nextMessage(msg, _producerSession));
+ _producerSession.commit();
}
- // HACK to do something synchronous
- syncProducerSession();
-
_consumer = _consumerSession.createConsumer(_queue);
_consumerConnection.start();
Message received;
- List<Message> messages = new ArrayList<Message>();
- while((received = _consumer.receive(1000))!=null)
+ List<Message> messages = new ArrayList<>();
+ while((received = _consumer.receive(getReceiveTimeout())) != null)
{
messages.add(received);
}
@@ -222,9 +212,7 @@ public class LastValueQueueTest extends
{
_producer.send(nextMessage(msg, _producerSession));
}
-
- // HACK to do something synchronous
- syncProducerSession();
+ _producerSession.commit();
// this causes the "old" messages to be released
_consumerSession.close();
@@ -239,8 +227,8 @@ public class LastValueQueueTest extends
_consumer = _consumerSession.createConsumer(_queue);
_consumerConnection.start();
- messages = new ArrayList<Message>();
- while((received = _consumer.receive(1000))!=null)
+ messages = new ArrayList<>();
+ while((received = _consumer.receive(getReceiveTimeout())) != null)
{
messages.add(received);
}
@@ -266,6 +254,7 @@ public class LastValueQueueTest extends
for (int msg = 0; msg < MSG_COUNT; msg++)
{
_producer.send(nextMessage(msg, _producerSession));
+ _producerSession.commit();
}
_producerConnection.start();
final long queueDepth = getQueueDepth(_producerConnection,_queue);
@@ -285,28 +274,14 @@ public class LastValueQueueTest extends
for (int msg = 0; msg < MSG_COUNT; msg++)
{
_producer.send(nextMessage(msg, _producerSession));
-
+ _producerSession.commit();
}
- syncProducerSession();
-
- Queue browseQueue;
- if(isBroker10())
- {
- AMQBindingURL url =
- new AMQBindingURL("direct://amq.direct//" + _queueName + "?browse='true'&durable='true'");
- browseQueue = new AMQQueue(url);
- }
- else
- {
- browseQueue = _consumerSession.createQueue(_queueName);
- }
-
- _consumer = _consumerSession.createConsumer(browseQueue);
+ _consumer = _consumerSession.createConsumer(_queue);
_consumerConnection.start();
Message received;
- List<Message> messages = new ArrayList<Message>();
- while((received = _consumer.receive(1000))!=null)
+ List<Message> messages = new ArrayList<>();
+ while((received = _consumer.receive(getReceiveTimeout())) != null)
{
messages.add(received);
}
@@ -322,10 +297,9 @@ public class LastValueQueueTest extends
messages.clear();
_producer.send(nextMessage(MSG_COUNT, _producerSession));
+ _producerSession.commit();
- syncProducerSession();
-
- while((received = _consumer.receive(1000))!=null)
+ while((received = _consumer.receive(getReceiveTimeout())) != null)
{
messages.add(received);
}
@@ -338,49 +312,28 @@ public class LastValueQueueTest extends
_producerConnection.close();
}
- private void syncProducerSession() throws QpidException
- {
- if(!isBroker10())
- {
- ((AMQSession<?, ?>) _producerSession).sync();
- }
- }
-
public void testConflation2Browsers() throws Exception
{
_consumerConnection = getConnection();
_consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
createConflationQueue(_producerSession, true);
_producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
_producer.send(nextMessage(msg, _producerSession));
+ _producerSession.commit();
}
- syncProducerSession();
-
- Queue browseQueue;
- if(isBroker10())
- {
- browseQueue = _consumerSession.createQueue(_queueName);
- }
- else
- {
- AMQBindingURL url =
- new AMQBindingURL("direct://amq.direct//" + _queueName + "?browse='true'&durable='true'");
- browseQueue = new AMQQueue(url);
- }
+ _consumer = _consumerSession.createConsumer(_queue);
+ MessageConsumer consumer2 = _consumerSession.createConsumer(_queue);
- _consumer = _consumerSession.createConsumer(browseQueue);
- MessageConsumer consumer2 = _consumerSession.createConsumer(browseQueue);
_consumerConnection.start();
- List<Message> messages = new ArrayList<Message>();
- List<Message> messages2 = new ArrayList<Message>();
- Message received = _consumer.receive(1000);
- Message received2 = consumer2.receive(1000);
+ List<Message> messages = new ArrayList<>();
+ List<Message> messages2 = new ArrayList<>();
+ Message received = _consumer.receive(getReceiveTimeout());
+ Message received2 = consumer2.receive(getReceiveTimeout());
while(received!=null || received2!=null)
{
@@ -394,8 +347,8 @@ public class LastValueQueueTest extends
}
- received = _consumer.receive(1000);
- received2 = consumer2.receive(1000);
+ received = _consumer.receive(getReceiveTimeout());
+ received2 = consumer2.receive(getReceiveTimeout());
}
@@ -455,19 +408,15 @@ public class LastValueQueueTest extends
_consumer = _consumerSession.createConsumer(_queue);
_consumerConnection.start();
- Map<String, Integer> messageSequenceNumbersByKey = new HashMap<String, Integer>();
+ Map<String, Integer> messageSequenceNumbersByKey = new HashMap<>();
Message message;
int numberOfShutdownsReceived = 0;
int numberOfMessagesReceived = 0;
while(numberOfShutdownsReceived < 2)
{
- message = _consumer.receive(5000);
- if(message == null)
- {
- System.err.println("here's a good place for a breakpoint");
- }
- assertNotNull("null recieved after " + numberOfMessagesReceived + " messages and " + numberOfShutdownsReceived + " shutdowns", message);
+ message = _consumer.receive(getReceiveTimeout());
+ assertNotNull("null received after " + numberOfMessagesReceived + " messages and " + numberOfShutdownsReceived + " shutdowns", message);
if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN))
{
@@ -501,7 +450,7 @@ public class LastValueQueueTest extends
private volatile Exception _exception;
private Thread _thread;
- private Map<String, Integer> _messageSequenceNumbersByKey = new HashMap<String, Integer>();
+ private Map<String, Integer> _messageSequenceNumbersByKey = new HashMap<>();
private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT/4);
public BackgroundMessageProducer(String threadName)
@@ -585,7 +534,7 @@ public class LastValueQueueTest extends
{
if(isBroker10())
{
- final Map<String, Object> arguments = new HashMap<String, Object>();
+ final Map<String, Object> arguments = new HashMap<>();
arguments.put(LastValueQueue.LVQ_KEY, KEY_PROPERTY);
if(enforceBrowseOnly)
{
@@ -596,11 +545,13 @@ public class LastValueQueueTest extends
}
else
{
- final Map<String, Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key", KEY_PROPERTY);
- ((AMQSession<?, ?>) session).createQueue(_queueName, false, true, false, arguments);
- _queue = new AMQQueue("amq.direct", _queueName);
- ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) _queue);
+ String browserOnly = enforceBrowseOnly ? "mode: browse," : "";
+ String addr = String.format("ADDR:%s; {create: always, %s" +
+ "node: {x-declare:{arguments : {'qpid.last_value_queue_key':'%s'}}}}",
+ _queueName, browserOnly, KEY_PROPERTY);
+
+ _queue = session.createQueue(addr);
+ session.createConsumer(_queue).close();
}
}
@@ -613,7 +564,8 @@ public class LastValueQueueTest extends
{
Message send = producerSession.createTextMessage("Message: " + msg);
- send.setStringProperty(KEY_PROPERTY, String.valueOf(msg % numberOfUniqueKeyValues));
+ final String keyValue = String.valueOf(msg % numberOfUniqueKeyValues);
+ send.setStringProperty(KEY_PROPERTY, keyValue);
send.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, msg);
return send;
Modified: qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes?rev=1774130&r1=1774129&r2=1774130&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes (original)
+++ qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes Wed Dec 14 08:38:21 2016
@@ -33,7 +33,6 @@ org.apache.qpid.server.security.acl.Exha
org.apache.qpid.server.queue.ModelTest#*
org.apache.qpid.test.unit.topic.DurableSubscriptionTest#*
org.apache.qpid.test.unit.client.MaxDeliveryCountTest#*
-org.apache.qpid.test.client.queue.LVQTest#*
org.apache.qpid.systest.rest.ConnectionRestTest#*
Modified: qpid/java/trunk/test-profiles/JavaPre010Excludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/JavaPre010Excludes?rev=1774130&r1=1774129&r2=1774130&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/JavaPre010Excludes (original)
+++ qpid/java/trunk/test-profiles/JavaPre010Excludes Wed Dec 14 08:38:21 2016
@@ -56,9 +56,6 @@ org.apache.qpid.test.unit.client.connect
// uses AMQP 0-10 related properties
org.apache.qpid.test.unit.message.JMSPropertiesTest#testQpidExtensionProperties
-// LVQ tests use new address syntax and can not be run on 0.9.1 profiles
-org.apache.qpid.test.client.queue.LVQTest#*
-
// Verification of unique client id is 0-10 specific
org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerificationForSameUser
org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerificationForDifferentUsers
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org