You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2018/01/29 07:47:39 UTC
[2/5] activemq-artemis git commit: ARTEMIS-1638 Fixing Purge rollback
behaviour (test only)
ARTEMIS-1638 Fixing Purge rollback behaviour (test only)
Having a commit with just a test will make it easy for developers to checkout just this branch
and validate the issue.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/69429e4e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/69429e4e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/69429e4e
Branch: refs/heads/master
Commit: 69429e4e230960c2bd29e031fc270a4a72dac148
Parents: c671fa0
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Jan 25 21:17:25 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Jan 26 23:24:55 2018 -0500
----------------------------------------------------------------------
.../integration/amqp/AmqpClientTestSupport.java | 5 +
.../amqp/AmqpPurgeOnNoConsumersTest.java | 127 ++++++++++++++-----
2 files changed, 103 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/69429e4e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 044183f..a01c975 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -304,6 +304,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
}
protected void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception {
+ sendMessages(destinationName, count, routingType, false);
+ }
+
+ protected void sendMessages(String destinationName, int count, RoutingType routingType, boolean durable) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
@@ -313,6 +317,7 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
for (int i = 0; i < count; ++i) {
AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:" + i);
+ message.setDurable(true);
if (routingType != null) {
message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType());
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/69429e4e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
index 9ded61c..ae4849b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
@@ -16,10 +16,20 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -28,57 +38,116 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
public class AmqpPurgeOnNoConsumersTest extends AmqpClientTestSupport {
+ @Override
+ protected String getConfiguredProtocols() {
+ return "AMQP,OPENWIRE,CORE";
+ }
+
@Test(timeout = 60000)
public void testQueueReceiverReadMessage() throws Exception {
+ AmqpConnection connection = null;
String queue = "purgeQueue";
SimpleString ssQueue = new SimpleString(queue);
+
server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST));
server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false);
AmqpClient client = createAmqpClient();
- AmqpConnection connection = addConnection(client.connect());
+ connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
final AmqpReceiver receiver = session.createReceiver(queue);
- Queue queueView = getProxyToQueue(queue);
+ QueueImpl queueView = (QueueImpl)getProxyToQueue(queue);
+ assertEquals(0L, queueView.getPageSubscription().getPagingStore().getAddressSize());
assertEquals(0, queueView.getMessageCount());
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- for (int i = 0; i < 4; i++) {
- try {
- AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
- receive.accept();
- assertNotNull(receive);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- try {
- receiver.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- });
+ sendMessages(queue, 5, null, true);
- t.start();
+ Wait.assertEquals(5, queueView::getMessageCount);
receiver.flow(5);
- sendMessages(queue, 5);
-
- t.join(5000);
+ for (int i = 0; i < 4; i++) {
+ try {
+ AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+ receive.accept();
+ assertNotNull(receive);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ receiver.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
Wait.assertEquals(0, queueView::getMessageCount);
+ assertEquals(0L, queueView.getPageSubscription().getPagingStore().getAddressSize());
connection.close();
+
+ server.stop();
+
+ server.start();
+
+ queueView = (QueueImpl)getProxyToQueue(queue);
+
+ assertEquals(0, queueView.getMessageCount());
+ assertEquals(0L, queueView.getPageSubscription().getPagingStore().getAddressSize());
+ }
+
+
+ // I'm adding the core test here to compare semantics between AMQP and core on this test.
+ @Test(timeout = 60000)
+ public void testPurgeQueueCoreRollback() throws Exception {
+ String queue = "purgeQueue";
+ SimpleString ssQueue = new SimpleString(queue);
+ server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST));
+ server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false);
+
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:5672");
+ Connection connection = cf.createConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(session.createQueue("purgeQueue"));
+
+ javax.jms.Queue jmsQueue = session.createQueue(queue);
+ MessageConsumer consumer = session.createConsumer(jmsQueue);
+
+ for (int i = 0; i < 10; i++) {
+ Message message = session.createTextMessage("hello " + i);
+ producer.send(message);
+ }
+ session.commit();
+
+ QueueImpl queueView = (QueueImpl)getProxyToQueue(queue);
+
+ Wait.assertEquals(10, queueView::getMessageCount);
+
+ connection.start();
+
+
+ for (int i = 0; i < 10; i++) {
+ TextMessage txt = (TextMessage)consumer.receive(1000);
+ assertNotNull(txt);
+ assertEquals("hello " + i, txt.getText());
+ }
+ consumer.close();
+ session.rollback();
+ connection.close();
+
+ Wait.assertEquals(0, queueView::getMessageCount);
+
+ server.stop();
+
+ server.start();
+
+ queueView = (QueueImpl)getProxyToQueue(queue);
+
+ assertEquals(0, queueView.getMessageCount());
+ assertEquals(0L, queueView.getPageSubscription().getPagingStore().getAddressSize());
}
}